在不同服务器上运行的消费者组中的重复项

问题描述 投票:0回答:1

我面临重复问题,当相同的消费者组在两台不同的服务器上运行时,如果一台服务器的消费者组出现故障,另一台服务器会创建重复项。 你能给我一个解决方案吗,如何使用 scala 防止重复?

所有配置,我都尝试过,但重复仍然存在

  properties.put(StreamsConfig.EXACTLY_ONCE,true)
  properties.put("group.id", "groupid")
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

  properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,5)
  properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
  properties.put(ProducerConfig.ACKS_CONFIG,"all")
  properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true)
apache-kafka kafka-consumer-api
1个回答
0
投票

我假设您有一个分区在两个实例之间共享?

您需要配置一个 Consumer Rebalance Listener 以在

onPartitionsRevoked
期间提交已处理的偏移量。这样,当下一个实例命中其侦听器的
onPartitionsAssigned
时,它可以查找已提交、已撤销实例的偏移量。

如果还没有的话,您还应该禁用自动提交并尝试在生产者上启用事务。

注意:Kafka Streams 要求您使用 ConsumerPrefix 和 ProducerPrefix 静态函数进行内部客户端设置

© www.soinside.com 2019 - 2024. All rights reserved.