我刚刚开始使用 Kafka,kafka-python。在下面的代码中,我尝试在消息到达时读取它们。但由于某种原因,消费者似乎要等到一定数量的消息积累后再获取它们。
我一开始还以为是制作方批量发布的原因。当我运行“kafka-console-consumer --bootstrap-servers --topic”时,我可以看到发布后立即收到的每条消息(如消费者控制台上所示)
但是Python脚本无法以同样的方式接收消息。
def run():
success_consumer = KafkaConsumer('success_logs',
bootstrap_servers=KAFKA_BROKER_URL,
group_id=None,
fetch_min_bytes=1,
fetch_max_bytes=10,
enable_auto_commit=True)
#dummy poll
success_consumer.poll()
for msg in success_consumer:
print(msg)
success_consumer.close()
有人可以指出 KafkaConsumer 的配置发生了哪些变化吗?为什么它无法读取“kafka-console-consumer”之类的消息?
KafkaConsumer 类还有一个
fetch_max_wait_ms
参数。您应该将其设置为 0
success_consumer = KafkaConsumer(...,fetch_max_wait_ms=0)
这对于OP来说可能为时已晚,但对于其他遇到此问题的人来说,添加像这样对我有用的超时:
consumer.poll(timeout_ms= 1000)