我的应用程序正在从kafka读取一个主题,在丰富它之后,它保存到另一个主题。这个 StreamsConfig.NUM_STREAM_THREADS_CONFIG
被配置为 8
. 我有两个经纪人和12个分区。
Topic: enriched-request PartitionCount: 12 ReplicationFactor: 2 Configs: min.insync.replicas=1,flush.ms=86400000,segment.bytes=1073741824,flush.messages=1073741824,max.message.bytes=1000000,index.interval.bytes=4096,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=259200000,segment.ms=604800000,segment.index.bytes=10485760
Topic: enriched-request Partition: 0 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 1 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 2 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 3 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 4 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 5 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 6 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 7 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 8 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 9 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 10 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 11 Leader: 8 Replicas: 8,7 Isr: 7,8
从两个星期前开始,在我的测试环境中,我就收到了日志信息。INFO AbstractCoordinator:336 - [Consumer clientId=my-enrichments-client-StreamThread-4-consumer, groupId=my-enrichments] (Re-)joining group
所有8个线程都会出现这种情况。每隔5分钟就会触发一次,每次都是一组新的8个线程。
在我的 kafka.log
我明白了。Member my-enrichments-client-StreamThread-4-consumer-6409090a-9d06-4bc0-8dd0-cd4c8bd28d71 in group my-enrichments has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
又是8条线 这里和上面一样,每隔5分钟这些都是新的8个线程的集合,它正在删除。
在我的测试环境中,只有一个应用程序在运行。我试着等待15-20分钟重新部署,但我一直得到同样的错误。有谁知道如何解决这个问题,而不需要改变我的 StreamsConfig.CLIENT_ID_CONFIG
或 StreamsConfig.APPLICATION_ID_CONFIG
?
消费者配置
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-enrichments");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-enrichments-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, my bootstrap sersvers);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
如果能帮上忙的话,在我增加了线程数量后的一周左右,它就开始发生了。3
到 8
. 我不知道是否与此有关。
由于我找不到答案,我尝试了以下方法。
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --describe --group my-enrichments
. 这样我就得到了警告 Warning: Consumer group 'my-enrichments' is rebalancing.
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --delete --group my-enrichments
这导致了错误。Error: Deletion of some consumer groups failed:
* Group 'my-enrichments' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.