Kafka 与 Nestjs 错误:KafkaJSProtocolError:此服务器不托管此主题分区

问题描述 投票:0回答:1

我有一个使用 docker compose 运行的 kafka 集群:

broker1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: broker1
    container_name: broker1
    depends_on:
      - controller
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      

  controller:
    image: confluentinc/cp-kafka:7.4.0
    hostname: controller
    container_name: controller
    ports:
      - "9093:9093"
      - "9102:9102"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

注意

KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
这意味着 kafka 配置为自动创建主题。

我的 Nest 客户端配置:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092']
        }
      },
      consumer: {
        groupId: 'my-consumer-group',
        allowAutoTopicCreation: true
      }
    }
)

为了消费特定主题,我使用

@EventPattern
装饰器。

我希望应用程序将创建我的消费者正在使用的主题,但我收到此错误:

ERROR [Connection] Response Meadata(key: 3, version: 6) {"timestamp": "2023-08-09T10:20:17.462Z", "logger": "kafkajs", "broker": "localhost:9092", "clientId": "nestjs-consumer-server", "error": "This server does noy host this topic-partition", "correlationId": 7, "size": 371"}

KafkaJSProtocolError: This server does not host this topic-partition
....

要说的重要一点是,相同的代码和配置适用于不同的 kafka 堆栈(我只是没有有关最后一个堆栈的特定图像和版本的所有信息)

我不明白是kafka配置有问题,还是kafka不允许消费者自动创建主题,或者nest有问题?

编辑:

我发现应用程序每次都会创建两个主题,然后失败,所以我认为这意味着 Kafka 已正确配置为自动主题创建,问题出在 Nest 应用程序或 kafkajs 中。

apache-kafka nest kafkajs
1个回答
0
投票

这不是一个完美的解决方案,但最终对我有用的是使用 Kafka 和 Zookeeper 而不是 kraft,这解决了错误并使消费者能够自动创建所有主题。但更好的解决方案可能是在消费者尝试使用主题之前创建主题。

© www.soinside.com 2019 - 2024. All rights reserved.