我只是在玩 Kafka,以便为未来的项目评估它。下面是两个 python 脚本,使用一些基于 kafka-python 库的示例代码。我试图弄清楚为什么当我发布来自生产者的 100 条消息时我得到 < 100 messages on the consumer.
需要注意的一件事是,如果我在发布者中放入一个小的 time.sleep(2/1000) 延迟,那么消费者将报告正在消费的所有 100 条消息。对我来说,这表明代理可能无法处理生产者发布的所有消息?对我来说,这似乎难以置信。请注意,当我创建主题时,我仅使用一个分区创建了它。
生产者输出样本:
对于生产者发布到主题的每条消息,运行并订阅同一主题的消费者进程都必须接收每条消息。
# producer.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
producer = KafkaProducer(bootstrap_servers=['msg1:9092'])
# Asynchronous by default
future = producer.send('quickstart-events', b'raw_bytes')
for i in range(1,100):
key = f"key{i}"
msg = f"msg{i}"
print(f"Publish message {msg}")
future = producer.send(topic='quickstart-events', key=key.encode(), value=msg.encode())
#putting in the 2ms sleep DOES allow for all messages to be received
#time.sleep(2/1000)
# consumer.py
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('quickstart-events',
group_id='my-group',
bootstrap_servers=['msg1:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
正如您在此示例消息中看到的:缺少 msg93、msg94...msg99
# Publisher output
Publish message msg1
Publish message msg2
Publish message msg3
....
Publish message msg97
Publish message msg98
Publish message msg99
# Consumer output
quickstart-events:0:3850: key=None value=b'raw_bytes'
quickstart-events:0:3851: key=b'key1' value=b'msg1'
quickstart-events:0:3852: key=b'key2' value=b'msg2'
quickstart-events:0:3853: key=b'key3' value=b'msg3'
.....
quickstart-events:0:3940: key=b'key90' value=b'msg90'
quickstart-events:0:3941: key=b'key91' value=b'msg91'
quickstart-events:0:3942: key=b'key92' value=b'msg92'
您需要在循环后调用 Producer.flush() ,否则脚本将退出,并在生产者缓冲区中保留待处理的记录
您还应该能够向生产者发送函数添加交付报告功能,而不是在记录实际发送并由代理确认之前进行记录