延迟Kafka主题中的一些记录

问题描述 投票:0回答:1

我有一个消费者项目,它使用Kafka主题的数据。该流中90%的数据可以实时处理,但对于特定记录(~10%),我需要延迟处理。

我应该在同一个JVM中有两个独立的消费者,并且消耗一个消费者中90%的记录而忽略10%并让其他消费者处理它或将10%的消息推送到另一个主题并延迟另一个消息的处理话题?

如果我能有一个单一的消费者和两个检查点机制,一个90%,另一个延迟10%,但Kafka客户似乎不支持这个用例,那将是很好的。这将有助于我避免任何不必要的反序列化和网络IO。

java apache-kafka kafka-consumer-api
1个回答
1
投票

对于单个消费者来说,不可能有多个检查点 - 要么使用两个消费者,要么使用两个主题。

  • 两个消费者的一个问题是所有消息都会被读取两次,而不是在同一时间读取,这会导致问题知道消息何时是历史消息而不是消息:如果今天发出的消息是在23处读取的话会发生什么? :59:59由一个消费者和00:00:01由另一个消费者(相同的发行与相对时间)。您可以使用一些滞后和监视滞后来防止此问题。
  • 将您的数据分为两个主题。您可以使用kafka流或任何其他流处理工具。例如,您的events主题将被处理并分为两个主题historical-eventsrealtime-events。您仍然会有两个消费者,但不会有相同的主题。正如您所建议的那样,您也可以只使用events主题,处理即时数据并将历史数据发送到其他主题(因此您有两个主题而不是三个主题,并且承诺偏移没有问题) - 但这意味着更多的IO用于该过程客户,以及客户的两项责任 由于您只有一个消费者从基本主题中读取每条消息,因此它将始终是最近的或历史的,因此您不会遇到上一个问题。
  • 只有一个消费者会相应地处理消息,但正如您所指出的那样,使用偏移提交会产生问题,并且在给定历史批次时可能会使用大量RAM。关于偏移提交,您可以简单地存储(在另一个kafka压缩主题中与_consumer_offset相同的方式)关于历史或当前偏移的最后一个偏移,以及重新启动时,从历史批处理中恢复,并忽略所有“最近”数据直到达到正确的偏移量。这是可能的,但使用更多的RAM,并且更麻烦。

您的选择在很大程度上取决于您的问题(IO,RAM,只是有正确的行为)。从一开始就分离两个主题可能是最容易实现的,可以在一个单独的过程中完成,有效地分离每个过程的责任,并将对处理客户端的影响降至最低。

© www.soinside.com 2019 - 2024. All rights reserved.