简单的 Kafka 生产者/消费者 - 消费者不消费所有消息

问题描述 投票:0回答:1

我只是在玩 Kafka,以便为未来的项目评估它。下面是两个 python 脚本,使用一些基于 kafka-python 库的示例代码。我试图弄清楚为什么当我发布来自生产者的 100 条消息时我得到 < 100 messages on the consumer.

  • 标准开箱即用的 Kafka 代理设置
  • 消费者(只有一个消费者)开始使用以下代码来消费来自“quickstart-events”主题的所有消息
  • 生产者将在屏幕上列出已发布的消息 msg0,msg1....msg99
  • 消费者只会报告可能消耗的 msg0、msg1...msg70,而不是完整的 100 个

需要注意的一件事是,如果我在发布者中放入一个小的 time.sleep(2/1000) 延迟,那么消费者将报告正在消费的所有 100 条消息。对我来说,这表明代理可能无法处理生产者发布的所有消息?对我来说,这似乎难以置信。请注意,当我创建主题时,我仅使用一个分区创建了它。

生产者输出样本:

对于生产者发布到主题的每条消息,运行并订阅同一主题的消费者进程都必须接收每条消息。

    # producer.py
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import time

    producer = KafkaProducer(bootstrap_servers=['msg1:9092'])

    # Asynchronous by default
    future = producer.send('quickstart-events', b'raw_bytes')

    for i in range(1,100):
        key = f"key{i}"
        msg = f"msg{i}"
        print(f"Publish message {msg}")
        future = producer.send(topic='quickstart-events', key=key.encode(), value=msg.encode())
        #putting in the 2ms sleep DOES allow for all messages to be received    
        #time.sleep(2/1000)
    # consumer.py
    from kafka import KafkaConsumer

    # To consume latest messages and auto-commit offsets
    consumer = KafkaConsumer('quickstart-events',
                         group_id='my-group',
                         bootstrap_servers=['msg1:9092'])

    for message in consumer:
        # message value and key are raw bytes -- decode if necessary!
        # e.g., for unicode: `message.value.decode('utf-8')`
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

正如您在此示例消息中看到的:缺少 msg93、msg94...msg99

# Publisher output
Publish message msg1
Publish message msg2
Publish message msg3
....
Publish message msg97
Publish message msg98
Publish message msg99

  # Consumer output
quickstart-events:0:3850: key=None value=b'raw_bytes'
quickstart-events:0:3851: key=b'key1' value=b'msg1'
quickstart-events:0:3852: key=b'key2' value=b'msg2'
quickstart-events:0:3853: key=b'key3' value=b'msg3'
.....
quickstart-events:0:3940: key=b'key90' value=b'msg90'
quickstart-events:0:3941: key=b'key91' value=b'msg91'
quickstart-events:0:3942: key=b'key92' value=b'msg92'
python apache-kafka
1个回答
0
投票

您需要在循环后调用 Producer.flush() ,否则脚本将退出,并在生产者缓冲区中保留待处理的记录

您还应该能够向生产者发送函数添加交付报告功能,而不是在记录实际发送并由代理确认之前进行记录

© www.soinside.com 2019 - 2024. All rights reserved.