我正在使用 Kafka 将日志发送到某个主题。发送消息时,我总是收到此错误
Message: 'test log'
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
File "/home/ubuntu/applications/pythonapp/kafka_handler.py", line 53, in emit
self.flush(timeout=1.0)
File "/home/ubuntu/applications/pythonapp/kafka_handler.py", line 59, in flush
self.producer.flush(timeout=timeout)
File "/home/ubuntu/env_revamp/lib/python3.8/site-packages/kafka/producer/kafka.py", line 649, in flush
self._accumulator.await_flush_completion(timeout=timeout)
File "/home/ubuntu/env_revamp/lib/python3.8/site-packages/kafka/producer/record_accumulator.py", line 529, in await_flush_completion
raise Errors.KafkaTimeoutError('Timeout waiting for future')
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
我尝试了一些解决方案,例如注释掉该行
self.flush(timeout=1.0)
我也尝试将超时值增加到 5.0 但没有成功。
我的堆栈:
Python 3.8.10
pip 包列表
alembic==1.8.1
certifi==2022.9.24
charset-normalizer==2.1.1
click==8.1.3
dataclasses==0.6
Flask==2.2.2
Flask-Cors==3.0.10
Flask-Migrate==4.0.0
Flask-SQLAlchemy==3.0.2
graypy==2.1.0
greenlet==2.0.1
idna==3.4
importlib-metadata==5.0.0
importlib-resources==5.10.0
itsdangerous==2.1.2
Jinja2==3.1.2
kafka-python==2.0.2
logging-gelf==0.0.26
Mako==1.2.4
MarkupSafe==2.1.1
marshmallow==3.19.0
numpy==1.24.2
packaging==21.3
pip==20.0.2
PyMySQL==1.0.2
pyparsing==3.0.9
requests==2.28.1
setuptools==44.0.0
six==1.16.0
SQLAlchemy==1.4.44
urllib3==1.26.12
Werkzeug==2.2.2
zipp==3.10.0
有人知道这个错误是什么以及我们如何解决这个问题吗?
我对 acks KafkaProducer 参数等于 0 或 1 也有同样的错误。如果我使用 acks = 'all' 我就没有错误。
my stack :
Python 3.11.7
Package Version
------------ -------
kafka-python 2.0.2
pip 23.3.2
setuptools 69.0.2
wheel 0.42.0
kafka version 3.6.1