组内成员失败触发Kafka中不断(重新)加入组。

问题描述 投票:1回答:1

我的应用程序正在从kafka读取一个主题,在丰富它之后,它保存到另一个主题。这个 StreamsConfig.NUM_STREAM_THREADS_CONFIG 被配置为 8. 我有两个经纪人和12个分区。

Topic: enriched-request PartitionCount: 12  ReplicationFactor: 2    Configs: min.insync.replicas=1,flush.ms=86400000,segment.bytes=1073741824,flush.messages=1073741824,max.message.bytes=1000000,index.interval.bytes=4096,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=259200000,segment.ms=604800000,segment.index.bytes=10485760
    Topic: enriched-request Partition: 0    Leader: 7   Replicas: 7,8   Isr: 7,8
    Topic: enriched-request Partition: 1    Leader: 8   Replicas: 8,7   Isr: 7,8
    Topic: enriched-request Partition: 2    Leader: 7   Replicas: 7,8   Isr: 7,8
    Topic: enriched-request Partition: 3    Leader: 8   Replicas: 8,7   Isr: 7,8
    Topic: enriched-request Partition: 4    Leader: 7   Replicas: 7,8   Isr: 7,8
    Topic: enriched-request Partition: 5    Leader: 8   Replicas: 8,7   Isr: 7,8
    Topic: enriched-request Partition: 6    Leader: 7   Replicas: 7,8   Isr: 7,8
    Topic: enriched-request Partition: 7    Leader: 8   Replicas: 8,7   Isr: 7,8
    Topic: enriched-request Partition: 8    Leader: 7   Replicas: 7,8   Isr: 7,8
    Topic: enriched-request Partition: 9    Leader: 8   Replicas: 8,7   Isr: 7,8
    Topic: enriched-request Partition: 10   Leader: 7   Replicas: 7,8   Isr: 7,8
    Topic: enriched-request Partition: 11   Leader: 8   Replicas: 8,7   Isr: 7,8

从两个星期前开始,在我的测试环境中,我就收到了日志信息。INFO AbstractCoordinator:336 - [Consumer clientId=my-enrichments-client-StreamThread-4-consumer, groupId=my-enrichments] (Re-)joining group 所有8个线程都会出现这种情况。每隔5分钟就会触发一次,每次都是一组新的8个线程。

在我的 kafka.log 我明白了。Member my-enrichments-client-StreamThread-4-consumer-6409090a-9d06-4bc0-8dd0-cd4c8bd28d71 in group my-enrichments has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) 又是8条线 这里和上面一样,每隔5分钟这些都是新的8个线程的集合,它正在删除。

在我的测试环境中,只有一个应用程序在运行。我试着等待15-20分钟重新部署,但我一直得到同样的错误。有谁知道如何解决这个问题,而不需要改变我的 StreamsConfig.CLIENT_ID_CONFIGStreamsConfig.APPLICATION_ID_CONFIG?

消费者配置

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-enrichments");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-enrichments-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, my bootstrap sersvers);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

如果能帮上忙的话,在我增加了线程数量后的一周左右,它就开始发生了。38. 我不知道是否与此有关。

apache-kafka kafka-consumer-api
1个回答
0
投票

由于我找不到答案,我尝试了以下方法。

  1. 描述一下这个组 bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --describe --group my-enrichments. 这样我就得到了警告 Warning: Consumer group 'my-enrichments' is rebalancing.
  2. 删掉它。bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --delete --group my-enrichments 这导致了错误。
Error: Deletion of some consumer groups failed:
* Group 'my-enrichments' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
  1. 重新启动经纪人就能解决这个问题
© www.soinside.com 2019 - 2024. All rights reserved.