我试图通过kafka传输一些巨大的json数据,但它没有发生。 python 生产者没有错误,消费者端也没有任何消息。
producer = KafkaProducer(bootstrap_servers="kafka_server")
product_message = json.dumps(data)
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))
max.message.bytes
才能生成消息。
此外,您还可以通过获取发送返回的 future 来了解正在发生的情况:https://stackoverflow.com/a/55538034/19059974
我有同样的错误,通过阅读这个线程我已经解决了这样的问题:
假设您在 /usr/local/kafka/config/ 路径下有文件 server.properties 和 Consumer.properties
并且您希望将最大消息长度增加到 15728640 (15 MB)
sudo vim /usr/local/kafka/config/server.properties
添加行或更改变量值
# my additions
# increase max message size to 15 MB
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
sudo vim /usr/local/kafka/config/consumer.properties
添加行或更改变量值
# my additions
# increase max message size to 15 MB
message.max.bytes=15728640
max.partition.fetch.bytes=15728640
fetch.max.bytes=15728640
使更改生效(手动一一运行这些命令)
sudo systemctl daemon-reload # wait a little
sudo systemctl restart zookeeper # wait a little
sudo systemctl restart kafka # wait a little
通过运行确保它们正确重新启动
sudo systemctl status zookeeper
sudo systemctl status kafka
如果没有,请重新启动
然后你必须将 k 参数 max_request_size 添加到你的 KafkaProducer,所以替换行
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))
有线
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'), max_request_size=15728640)