使用Python,如何生成大于1MB的kafka消息?

问题描述 投票:0回答:2

我试图通过kafka传输一些巨大的json数据,但它没有发生。 python 生产者没有错误,消费者端也没有任何消息。

producer = KafkaProducer(bootstrap_servers="kafka_server")
product_message = json.dumps(data)
producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))
python apache-kafka kafka-producer-api producer-consumer producer
2个回答
1
投票

您可能需要增加主题配置

max.message.bytes
才能生成消息。

此外,您还可以通过获取发送返回的 future 来了解正在发生的情况:https://stackoverflow.com/a/55538034/19059974


0
投票

我有同样的错误,通过阅读这个线程我已经解决了这样的问题:

假设您在 /usr/local/kafka/config/ 路径下有文件 server.properties 和 Consumer.properties

并且您希望将最大消息长度增加到 15728640 (15 MB)

服务器端(=经纪人)

sudo vim /usr/local/kafka/config/server.properties

添加行或更改变量值

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

消费端

sudo vim /usr/local/kafka/config/consumer.properties

添加行或更改变量值

# my additions
# increase max message size to 15 MB
message.max.bytes=15728640
max.partition.fetch.bytes=15728640
fetch.max.bytes=15728640

重启服务

使更改生效(手动一一运行这些命令)

sudo systemctl daemon-reload  # wait a little

sudo systemctl restart zookeeper  # wait a little

sudo systemctl restart kafka  # wait a little

通过运行确保它们正确重新启动

sudo systemctl status zookeeper
sudo systemctl status kafka

如果没有,请重新启动

生产者端

然后你必须将 k 参数 max_request_size 添加到你的 KafkaProducer,所以替换行

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'))

有线

producer.send(kafka_topic, key=bytes('kk', 'utf-8'), value=bytes(product_message, 'utf-8'), max_request_size=15728640)
© www.soinside.com 2019 - 2024. All rights reserved.