Python 中的 ConfluenceKafka:使用消费者到消费者记录集

问题描述 投票:0回答:1

我已经开始学习confluence kafka(python)。有 1 个生产者、1 个主题、1 个分区和 1 个消费者(简单设置)。我的要求是我希望集中获取数据。我读到使用 poll(some_time) 将等待所需的时间并批量/列表中获取记录。我认为这将是一个简单的迭代过程,例如:

msgs = 消费者.poll(1000) 对于消息中的消息: 采取一些行动............

问题是我无法迭代这个“msgs”对象 - 我扫描了文档以实现 poll 返回一条消息 - 那么有没有办法获取消息列表?(解决方法是频繁获取消息的子集)这需要时间,然后集体处理数据 - 但希望有另一种方法)。 poll()迭代过程似乎与kafka-python的方式不同。

python apache-kafka kafka-consumer-api confluent-platform
1个回答
0
投票

这是因为“poll”方法返回一个字典,该字典将主题分区映射到每个分区的记录列表。 你可以做这样的事情:

consumer.subscribe(['topic'])

while True:
    msgs = consumer.poll(0.5)  
    for tp, records in msgs.items():  # tp is a TopicPartition and represents a specific topic and partition
        for msg in records:
            # ...
© www.soinside.com 2019 - 2024. All rights reserved.