我们使用 60 个分区,有 70 个消费者(比分区多 10 个),任何分区中都没有消费者滞后,但重新平衡不断循环发生。 每个kafkaEvent的处理时间是10ms
以下是我得到的 CommitFailed 异常
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
我正在使用reactor-kafka:1.3.12,reactor-core:3.4.18,kafka-clients:3.0.1,Java 17版本和Tomcat Web服务器
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CommentedbootStrapServersUrl");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"user-groupid-1");
props.put(ConsumerConfig.CLIENT_ID_CONFIG,"client-1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,360000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,300500);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,30500);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,300000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
ReceiverOptions<String,String> receiverOptions = ReceiverOptions.<String,String>create(props)
.addAssignListener(partitions ->LOGGER.info("onPartitionAssigned {}",partitions))
.addRevokeListener(partitions -> LOGGER.info("onPartitionRevoked {}",partitions))
.commitRetryInterval(Duration.ofMillis(3000))
.commitBatchSize(3)
.commitInterval(Duration.ofMillis(2000))
.maxDelayRebalance(Duration.ofSeconds(60))
.subscription(Collections.singleton("commentedTopicName"));
Flux<ReceiverRecord<String,String>> inboundFlux = KafkaReceiver.create(receiverOptions)
.receive()
.onErrorContinue((throwable,o) -> {
LOGGER.error("Error while consuming in this pod");
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).transientErrors(true))
.repeat();
try {
Scheduler schedulers = Schedulers.newBoundedElastic(2,2,"UThread");
inboundFlux
.publishOn(schedulers)
.doOnNext(kafkaEvent -> {
callBusinessLogic(kafkaEvent);
kafkaEvent.receiverOffset().acknowledge();
})
.doOnError(e->e.printStackTrace())
.subscribe();
} catch (Exception e){
e.printStackTrace();
}
需要帮助解决重新平衡问题,消费者有时很稳定,但如果重新平衡开始,那么它永远不会停止。
我假设当您说 70 个消费者时,您指的是一个组内的消费者实例,而不是 70 个不同的消费者组。我认为你不断重新平衡的原因是你的消费者多于分区。这意味着一些消费者永远不会获得分区。我认为这对于重新平衡来说是一个糟糕的状态。即使这不是问题的原因,拥有比分区更多的消费者也没有任何好处。分区的数量是您可以拥有的最大数量或并行化。尝试将消费者数量减少到 60 个,或将分区数量增加到 70 个。 持续重新平衡的另一个常见原因是一个或多个消费者发送心跳的时间超过了超时时间。当处理一批数据需要很长时间时,就会发生这种情况。延长超时时间或请求较小的批次。