我有一个使用 docker compose 运行的 kafka 集群:
broker1:
image: confluentinc/cp-kafka:7.4.0
hostname: broker1
container_name: broker1
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
controller:
image: confluentinc/cp-kafka:7.4.0
hostname: controller
container_name: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
注意
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
这意味着 kafka 配置为自动创建主题。
我的 Nest 客户端配置:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092']
}
},
consumer: {
groupId: 'my-consumer-group',
allowAutoTopicCreation: true
}
}
)
为了消费特定主题,我使用
@EventPattern
装饰器。
我希望应用程序将创建我的消费者正在使用的主题,但我收到此错误:
ERROR [Connection] Response Meadata(key: 3, version: 6) {"timestamp": "2023-08-09T10:20:17.462Z", "logger": "kafkajs", "broker": "localhost:9092", "clientId": "nestjs-consumer-server", "error": "This server does noy host this topic-partition", "correlationId": 7, "size": 371"}
KafkaJSProtocolError: This server does not host this topic-partition
....
要说的重要一点是,相同的代码和配置适用于不同的 kafka 堆栈(我只是没有有关最后一个堆栈的特定图像和版本的所有信息)
我不明白是kafka配置有问题,还是kafka不允许消费者自动创建主题,或者nest有问题?
编辑:
我发现应用程序每次都会创建两个主题,然后失败,所以我认为这意味着 Kafka 已正确配置为自动主题创建,问题出在 Nest 应用程序或 kafkajs 中。
这不是一个完美的解决方案,但最终对我有用的是使用 Kafka 和 Zookeeper 而不是 kraft,这解决了错误并使消费者能够自动创建所有主题。但更好的解决方案可能是在消费者尝试使用主题之前创建主题。