我已经开始学习confluence kafka(python)。有 1 个生产者、1 个主题、1 个分区和 1 个消费者(简单设置)。我的要求是我希望集中获取数据。我读到使用 poll(some_time) 将等待所需的时间并批量/列表中获取记录。我认为这将是一个简单的迭代过程,例如:
msgs = 消费者.poll(1000) 对于消息中的消息: 采取一些行动............
问题是我无法迭代这个“msgs”对象 - 我扫描了文档以实现 poll 返回一条消息 - 那么有没有办法获取消息列表?(解决方法是频繁获取消息的子集)这需要时间,然后集体处理数据 - 但希望有另一种方法)。 poll()迭代过程似乎与kafka-python的方式不同。
这是因为“poll”方法返回一个字典,该字典将主题分区映射到每个分区的记录列表。 你可以做这样的事情:
consumer.subscribe(['topic'])
while True:
msgs = consumer.poll(0.5)
for tp, records in msgs.items(): # tp is a TopicPartition and represents a specific topic and partition
for msg in records:
# ...