onPartitionsRevokedBeforeCommit vs onPartitionsRevokedAfterCommit

问题描述 投票:0回答:1

我正在写一个Kafka消费者并且为了学习目的,这次我想到了使用Spring-Kafka实现。直到现在,我正在使用Java API来编写消费者。

我想手动管理偏移量,所以我在Spring-Kafka包中搜索类似于ConsumerRebalanceListener的东西。为了我的成功,我在Spring中遇到了ConsumerAwareRebalanceListener,它可以代替ConsumerRebalanceListener使用。

但是当我查看ConsumerAwareRebalanceListener接口时,可以看到2个方法 - onPartitionsRevokedBeforeCommit和onPartitionsRevokedAfterCommit,它们在Kafka java API中不可用。

有人可以解释一下我在何处/何处可以使用这种方法?

P.S - 看看Spring-Kafka实现,但不太明白它在哪里有用。

spring apache-kafka spring-kafka
1个回答
1
投票

Spring kafka有一个消息驱动的消费者模型;您提供POJO消息侦听器,框架执行轮询并将消息传递给侦听器,可以是一次一个也可以是批处理。

它有各种提交偏移的模式(它更喜欢在客户端关闭enable.auto.commmit)。

手动acks AckMode.MANUALAckMode.MANUAL_IMMEDIATE有两种模式;使用这些模式,我们将Acknowledgment对象传递给侦听器bean,然后调用ack.acknowledge()

当模式为MANUAL_IMMEDIATE时,只要在消费者线程上调用acknowledge(),就会直接调用使用者。

当模式为MANUAL时,偏移量将添加到内部队列,并且提交将在处理轮询结果时结束。

同样,有几种“自动”ack模式;主要是RECORDBATCH,当听众正常退出时,容器会提供抵消。在记录模式下,在处理每个记录之后发送提交,在批处理模式下,在处理完所有轮询结果之后完成提交。

批量提交偏移更有效,但会增加重复交付的风险。

我们还在发生重新平衡时提交任何挂起的偏移量。

那么,为什么两个onPartitionsRevoked*方法呢?

当使用MANUAL,BATCH或其他可能具有挂起偏移量的AckModes时,在提交挂起的偏移量之前调用onPartitionsRevokedBeforeCommit(),并在提交这些偏移量后调用onPartitionsRevokedAfterCommit()

因此,consumer.position()可能会在每种方法中返回不同的结果。

大多数人会对onPartitionsRevokedAfterCommit()感兴趣,但我们觉得我们应该提供两种选择。

如果你使用AckMode.MANUAL_IMMEDIATEAckMode.RECORD,应该没有区别,因为没有待定的ack。

但是,由于在消费线程上调用了侦听器,因此在轮询期间,使用基于时间或基于计数的AckModes之一时实际上只会有差异。使用其他ackmodes,我们已经承诺了抵消。

希望很清楚。

© www.soinside.com 2019 - 2024. All rights reserved.