使用kafka-python定期轮询Kafka使用者的最佳方法是什么?

问题描述 投票:0回答:2

我有多个生产者,正在将数据输入Kafka。我希望每小时运行一次Consumer,以一次获取所有累积的数据并进行进一步处理。


我想到的选项是:

  • 使用python线程并使用等效的setInterval来调用Consumer
  • 设置max_poll_interval_ms变量:(如其他一些答案所述)。但是,官方文件指出

这将限制消费者可以花费的时间在获取更多记录之前保持空闲状态。如果之前未调用poll()此超时到期后,则认为使用者失败了,小组将重新平衡这听起来不像是让消费者入睡然后再次解雇的原因。

  • 不是每小时轮询,而是跟踪消费者抵销并在将10,000条记录附加到Kafka之后进行轮询

但是,我想在消费者本身中进行管理。最好的方法是什么?

apache-kafka kafka-consumer-api polling kafka-python
2个回答
1
投票

每小时使用Cron或OS调度程序调用脚本。

如果需要等待10k条记录来完成任何有用的事情,那么我不确定Kafka是否适合该体系结构。另外,消费者的滞后实际上会一直落后[]


0
投票

如果您阅读了max_poll_interval_ms的官方文档,则这是使用者可以空闲的最大间隔。之后,该消费者被视为已死,并且发生了消费者群体重新平衡。

© www.soinside.com 2019 - 2024. All rights reserved.