Python Kafka 消费者在消息到达时不读取消息

问题描述 投票:0回答:2

我刚刚开始使用 Kafka,kafka-python。在下面的代码中,我尝试在消息到达时读取它们。但由于某种原因,消费者似乎要等到一定数量的消息积累后再获取它们。

我一开始还以为是制作方批量发布的原因。当我运行“kafka-console-consumer --bootstrap-servers --topic”时,我可以看到发布后立即收到的每条消息(如消费者控制台上所示)

但是Python脚本无法以同样的方式接收消息。

def run():
    success_consumer = KafkaConsumer('success_logs',
                                     bootstrap_servers=KAFKA_BROKER_URL,
                                     group_id=None,
                                     fetch_min_bytes=1,
                                     fetch_max_bytes=10,
                                     enable_auto_commit=True)
    #dummy poll
    success_consumer.poll()
    for msg in success_consumer:
        print(msg)

    success_consumer.close()

有人可以指出 KafkaConsumer 的配置发生了哪些变化吗?为什么它无法读取“kafka-console-consumer”之类的消息?

python apache-kafka kafka-consumer-api
2个回答
1
投票

KafkaConsumer 类还有一个

fetch_max_wait_ms
参数。您应该将其设置为 0

success_consumer = KafkaConsumer(...,fetch_max_wait_ms=0)

0
投票

这对于OP来说可能为时已晚,但对于其他遇到此问题的人来说,添加像这样对我有用的超时:

consumer.poll(timeout_ms= 1000)
© www.soinside.com 2019 - 2024. All rights reserved.