我有多个生产者,正在将数据输入Kafka。我希望每小时运行一次Consumer,以一次获取所有累积的数据并进行进一步处理。
我想到的选项是:
这将限制消费者可以花费的时间在获取更多记录之前保持空闲状态。如果之前未调用poll()此超时到期后,则认为使用者失败了,小组将重新平衡这听起来不像是让消费者入睡然后再次解雇的原因。
但是,我想在消费者本身中进行管理。最好的方法是什么?
每小时使用Cron或OS调度程序调用脚本。
如果需要等待10k条记录来完成任何有用的事情,那么我不确定Kafka是否适合该体系结构。另外,消费者的滞后实际上会一直落后[]
如果您阅读了max_poll_interval_ms的官方文档,则这是使用者可以空闲的最大间隔。之后,该消费者被视为已死,并且发生了消费者群体重新平衡。