我能够使用优秀的python-kafka
包来传播来自Kafka主题(运行OSX)的消息,例如:
from kafka import KafkaConsumer
consumer = KafkaConsumer('MyTopic',
group_id='alex',
bootstrap_servers=['kafka:9092'],
auto_offset_reset='largest')
for message in consumer:
print message.value
一位正在运行Windows 7的同事问我是否可以使用相同的方法进行流/过滤/警报。 “没问题”,我说。我们安装了Anaconda Python和Pycharm,并尝试传输一些消息。
不幸的是,在运行消费者之后的几分钟,显示以下消息:
File "C:\Users\[my_colleague]\AppData\Local\Continuum\Anaconda2\lib\site-packages\kafka\client_async.py", line 598, in connect
raise Errors.NodeNotReadyError(node_id)
kafka.common.NodeNotReadyError: None
我怀疑Windows防火墙阻止了消费者和代理之间的通信,因此我们简要地将Pycharm和python.exe添加到可以遍历防火墙的程序列表中。这没有用。
那时,我对问题有了一种归属感,在排除故障时我应该避免垄断他的笔记本电脑。鉴于Python,Kafka和Windows的普及,我想我们不是第一个面对这个问题的人。你能看出我们做错了什么吗?
这实际上是尝试自动探测代理版本时的错误。我们添加到python客户端是一种非标准功能,可以更容易地使用同一客户端在代理版本之间进行互操作。我的强烈建议是,一旦超过开发阶段,就应该使用api_version
参数明确传递代理版本。探测版本由kafka-python记录,以防您想要验证,但是从评论中听起来像这里的版本是0.9。所以在这里,
consumer = KafkaConsumer('MyTopic',
api_version=(0, 9),
group_id='alex',
bootstrap_servers=['kafka:9092'],
auto_offset_reset='largest')