连接到代理时kafka-python(1.0.0)抛出错误。同时/ usr / bin / kafka-console-producer和/ usr / bin / kafka-console-consumer工作正常。
Python应用程序过去也运行良好,但在zookeeper重新启动后,它不再能够连接。
我正在使用文档中的裸骨示例:
from kafka import KafkaProducer
from kafka.common import KafkaError
producer = KafkaProducer(bootstrap_servers=['hostname:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
我收到此错误:
Traceback (most recent call last): File "pp.py", line 4, in <module>
producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored
单步执行(/usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第270行的计算结果为false:
270 if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271 if self._can_send_request(node_id):
272 return True
273 return False
在我的情况下,self._metadata_refresh_in_progress为False,但ttl()= 0;
与此同时,kafka-console- *正在愉快地推送消息:
/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2
有什么建议?
我有同样的问题,上面的解决方案都没有奏效。然后我读取了异常消息,似乎必须指定api_version,所以
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))
注意:元组
(1,0,0)
匹配kafka版本1.0.0
工作正常(至少完成没有例外,现在必须说服它接受消息;))
我遇到了类似的问题。就我而言,代理主机名在客户端无法解析。尝试在配置文件中显式设置advertised.host.name
。
主机可以有多个dns别名。其中任何一个都适用于ssh或ping测试。但是,kafka连接应使用与代理的advertised.host.name
文件中的server.properties
匹配的别名。
我在bootstrap_servers
参数中使用了不同的别名。因此出错了。一旦我改变使用advertised.hostname
的电话,问题就解决了
我有同样的问题。
我用提示user3503929解决了这个问题。
kafka服务器安装在Windows上。
server.properties
...
host.name = 0.0.0.0
...
.
producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092',
value_serializer=str.encode)
producer.send('test', value='aaa')
producer.close()
print("DONE.")
在windows kafka客户端中处理没有问题。但是,当我在ubuntu中使用kafka-python向主题发送消息时,会引发NoBrokersAvailable
异常。
将以下设置添加到server.properties。
...
advertised.host.name = 192.168.1.3
...
它在相同的代码中成功运行。因为这个我花了三个小时。
谢谢
我有类似的问题,并从bootstrap_servers删除端口帮助。
consumer = KafkaConsumer('my_topic',
#group_id='x',
bootstrap_servers='kafka.com')
使用pip install kafka-python
安装kafka-python
创建kafka数据管道的步骤: - 1.使用shell命令运行Zookeeper或使用安装zookeeperd
sudo apt-get install zookeeperd
这将作为守护进程运行zookeeper,默认情况下侦听2181端口
以下是要运行的命令: -
cd kafka-directory
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
现在您已经运行了zookeeper和kafka服务器,运行producer.py脚本和consumer.py
producer.朋友:
来自kafka导入KafkaProducer导入时间
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'test'
lines = ["1","2","3","4","5","6","7","8"]
for line in lines:
try:
producer.send(topic, bytes(line, "UTF-8")).get(timeout=10)
except IndexError as e:
print(e)
continue
consumer.朋友:-
from kafka import KafkaConsumer
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
# print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
# message.offset, message.key,
# message.value))
print(message)
现在在单独的终端中运行producer.py和consumer.py来查看实时数据..!
注意:上面的producer.py脚本只运行一次以永久运行它,使用while循环并使用时间模块。
在server.properties文件中,确保将侦听器IP设置为远程计算机可访问的Box Ip地址。默认情况下它是localhost
在server.properties中更新此行:
listeners=PLAINTEXT://<Your-IP-address>:9092
另外,请确保您没有可能阻止其他IP地址与您联系的防火墙。如果你有sudo优先权。尝试禁用防火墙。
sudo systemctl stop firewalld