kafka-python:生产者无法连接

问题描述 投票:11回答:7

连接到代理时kafka-python(1.0.0)抛出错误。同时/ usr / bin / kafka-console-producer和/ usr / bin / kafka-console-consumer工作正常。

Python应用程序过去也运行良好,但在zookeeper重新启动后,它不再能够连接。

我正在使用文档中的裸骨示例:

from kafka import KafkaProducer
from kafka.common import KafkaError

producer = KafkaProducer(bootstrap_servers=['hostname:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')

我收到此错误:

Traceback (most recent call last):   File "pp.py", line 4, in <module>
    producer = KafkaProducer(bootstrap_servers=['hostname:9092'])   File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
    self.config['api_version'] = client.check_version()   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
    connect(node_id)   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
    raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored

单步执行(/usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第270行的计算结果为false:

270         if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271             if self._can_send_request(node_id):
272                 return True
273         return False

在我的情况下,self._metadata_refresh_in_progress为False,但ttl()= 0;

与此同时,kafka-console- *正在愉快地推送消息:

/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2

有什么建议?

apache-kafka kafka-python
7个回答
20
投票

我有同样的问题,上面的解决方案都没有奏效。然后我读取了异常消息,似乎必须指定api_version,所以

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))

注意:元组(1,0,0)匹配kafka版本1.0.0

工作正常(至少完成没有例外,现在必须说服它接受消息;))


8
投票

我遇到了类似的问题。就我而言,代理主机名在客户端无法解析。尝试在配置文件中显式设置advertised.host.name


3
投票

主机可以有多个dns别名。其中任何一个都适用于ssh或ping测试。但是,kafka连接应使用与代理的advertised.host.name文件中的server.properties匹配的别名。

我在bootstrap_servers参数中使用了不同的别名。因此出错了。一旦我改变使用advertised.hostname的电话,问题就解决了


2
投票

我有同样的问题。

我用提示user3503929解决了这个问题。

kafka服务器安装在Windows上。

server.properties

...
host.name = 0.0.0.0
...

.

producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092',         
                                         value_serializer=str.encode)
producer.send('test', value='aaa')
producer.close()
print("DONE.")

在windows kafka客户端中处理没有问题。但是,当我在ubuntu中使用kafka-python向主题发送消息时,会引发NoBrokersAvailable异常。

将以下设置添加到server.properties。

...
advertised.host.name = 192.168.1.3
...

它在相同的代码中成功运行。因为这个我花了三个小时。

谢谢


1
投票

我有类似的问题,并从bootstrap_servers删除端口帮助。

consumer = KafkaConsumer('my_topic',
                     #group_id='x',
                     bootstrap_servers='kafka.com')

1
投票

使用pip install kafka-python安装kafka-python

创建kafka数据管道的步骤: - 1.使用shell命令运行Zookeeper或使用安装zookeeperd

sudo apt-get install zookeeperd 

这将作为守护进程运行zookeeper,默认情况下侦听2181端口

  1. 运行kafka服务器
  2. 在不同的控制台上使用producer.py和consumer.py运行脚本以查看实时数据。

以下是要运行的命令: -

cd kafka-directory
./bin/zookeeper-server-start.sh  ./config/zookeeper.properties    
./bin/kafka-server-start.sh  ./config/server.properties

现在您已经运行了zookeeper和kafka服务器,运行producer.py脚本和consumer.py

producer.朋友:

来自kafka导入KafkaProducer导入时间

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'test'
lines = ["1","2","3","4","5","6","7","8"]
for line in lines:
  try:
    producer.send(topic, bytes(line, "UTF-8")).get(timeout=10)
  except IndexError as e:
    print(e)
  continue

consumer.朋友:-

from kafka import KafkaConsumer
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    # print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    #                                       message.offset, message.key,
    #                                       message.value))
    print(message)

现在在单独的终端中运行producer.py和consumer.py来查看实时数据..!

注意:上面的producer.py脚本只运行一次以永久运行它,使用while循环并使用时间模块。


0
投票

在server.properties文件中,确保将侦听器IP设置为远程计算机可访问的Box Ip地址。默认情况下它是localhost

在server.properties中更新此行:

listeners=PLAINTEXT://<Your-IP-address>:9092

另外,请确保您没有可能阻止其他IP地址与您联系的防火墙。如果你有sudo优先权。尝试禁用防火墙。

sudo systemctl stop firewalld
© www.soinside.com 2019 - 2024. All rights reserved.