我有一个用例,其中我有3个Kafka消费者在写一个主题,每个消费者中的消息需要按顺序进行处理。假如,其中一个使用者有滞后,则需要早些处理的消息将被丢弃(写条件)。因此,有没有一种方法可以保持这些消息的顺序。
消息始终在Kafka分区中排序。通常,属于某个键的所有消息都位于某个分区中(通过分区逻辑)。
我有一个用例,其中有3个Kafka消费者在写一个主题
我想,您是说您有3个使用者reading来自一个主题
这里有2种情况:
万一#1
您可以有3个使用者,每个使用者具有不同的group.id
,以便每个使用者都将使用所有消息集。在这里,速度较慢的消费者不会减慢其他消费者的速度。因为每个使用者通常在其自己的线程或进程中运行。
万一#2
您可以拥有3个具有相同group.id
的使用者,以便每个使用者将获得自己的分区份额。一个消费者消耗的消息将不会被另一个消费者消耗。同样在这里,较慢的消费者不会减慢其他消费者的速度。因为每个使用者只会使用自己的一组分区。
[如果,如果其中一个消费者存在滞后,则一条消息需要更早处理的将被丢弃(写条件)
[Kafka中没有隐式删除,您只需要在轮询后自己删除该消息即可。
[我认为,要检查滞后,您可以先从consumer.endOffsets()
和consumer.position()
开始,两者之间的差异应为您提供滞后。根据延迟,您可以选择删除消息。