我有以下程序来消费所有传入 Kafka 的消息。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
使用上面的程序我能够消费所有传入 Kafka 的消息。但是一旦所有消息都被消耗掉,我想关闭卡夫卡消费者,但这并没有发生。我同样需要帮助。
如果我向 KafkaConsumer 对象提供 consumer_timeout_ms 参数,我现在就可以关闭 Kafka Consumer。它接受以毫秒为单位的超时值。 下面是代码片段。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'],
consumer_timeout_ms=1000)
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
在上面的代码中,如果消费者在 1 秒内没有看到任何消息,它将关闭会话。
enable.partition.eof 就是您所需要的。当将此配置设置为 true 时。每当消费者到达分区末尾时,它将发出 PARTITION_EOF 事件。因此,您可以通过某些回调函数知道何时到达分区的末尾。这样,当您到达所有分区的末尾时,您可以选择关闭消费者。
您只需添加一个条件,如果满足,您就可以中断 for 循环:
for message in consumer:
if condition:
break
在您的情况下,您希望在所有消息都被消费时停止,因此您必须找到一种方法来告诉消费者所有消息都已到达。例如,您可以生成一条消息,其中可能包含该信息,然后您的条件将检查所使用的消息是否是报告所有消息已到达的消息。
之前提到的另一个例子只是假设,如果在一定时间内没有消息到达(这里建议 1 秒,但也许多几秒至少可能会更好),这意味着没有更多消息到达.
我这样做的方法是检查我收到的所有 ID 是否至少被计算一次(以避免重复),但这需要您确切地知道您收到的内容以及一些可能超出本范围的逻辑问题,但我发现这是一种非常有用且优雅的方法来确定如何停止消费,以下是您需要的一些代码:
sum = 0
data = {
0: None,
1: None,
2: None,
3: None
}
for message in consumer:
payload = msg.value
unique_id = payload["unique_id"]
if data[unique_id] is None:
data[unique_id] = payload
sum += 1
if len(data) == sum:
break