我正在写一个Kafka消费者并且为了学习目的,这次我想到了使用Spring-Kafka实现。直到现在,我正在使用Java API来编写消费者。
我想手动管理偏移量,所以我在Spring-Kafka包中搜索类似于ConsumerRebalanceListener的东西。为了我的成功,我在Spring中遇到了ConsumerAwareRebalanceListener,它可以代替ConsumerRebalanceListener使用。
但是当我查看ConsumerAwareRebalanceListener接口时,可以看到2个方法 - onPartitionsRevokedBeforeCommit和onPartitionsRevokedAfterCommit,它们在Kafka java API中不可用。
有人可以解释一下我在何处/何处可以使用这种方法?
P.S - 看看Spring-Kafka实现,但不太明白它在哪里有用。
Spring kafka有一个消息驱动的消费者模型;您提供POJO消息侦听器,框架执行轮询并将消息传递给侦听器,可以是一次一个也可以是批处理。
它有各种提交偏移的模式(它更喜欢在客户端关闭enable.auto.commmit
)。
手动acks AckMode.MANUAL
和AckMode.MANUAL_IMMEDIATE
有两种模式;使用这些模式,我们将Acknowledgment
对象传递给侦听器bean,然后调用ack.acknowledge()
。
当模式为MANUAL_IMMEDIATE
时,只要在消费者线程上调用acknowledge()
,就会直接调用使用者。
当模式为MANUAL
时,偏移量将添加到内部队列,并且提交将在处理轮询结果时结束。
同样,有几种“自动”ack模式;主要是RECORD
和BATCH
,当听众正常退出时,容器会提供抵消。在记录模式下,在处理每个记录之后发送提交,在批处理模式下,在处理完所有轮询结果之后完成提交。
批量提交偏移更有效,但会增加重复交付的风险。
我们还在发生重新平衡时提交任何挂起的偏移量。
那么,为什么两个onPartitionsRevoked*
方法呢?
当使用MANUAL,BATCH或其他可能具有挂起偏移量的AckMode
s时,在提交挂起的偏移量之前调用onPartitionsRevokedBeforeCommit()
,并在提交这些偏移量后调用onPartitionsRevokedAfterCommit()
。
因此,consumer.position()
可能会在每种方法中返回不同的结果。
大多数人会对onPartitionsRevokedAfterCommit()
感兴趣,但我们觉得我们应该提供两种选择。
如果你使用AckMode.MANUAL_IMMEDIATE
或AckMode.RECORD
,应该没有区别,因为没有待定的ack。
但是,由于在消费线程上调用了侦听器,因此在轮询期间,使用基于时间或基于计数的AckMode
s之一时实际上只会有差异。使用其他ackmodes,我们已经承诺了抵消。
希望很清楚。