即使使用“auto.offset.reset”也不会收到旧消息:“最早”

问题描述 投票:0回答:1

我正在尝试使用 Confluent Kafka 在 python 中编写一个 kafka 消费者。我可以收到所有新消息,但如果我杀死并重新启动我的消费者,我就不会收到任何旧消息

def confluent_kafka_consumer(app):
    with app.app_context():
        import config
    app.logger.info('Running Confluent Kafka consumer')

    consumer_config = {
        'bootstrap.servers': F'{config.Config.KAFKA_BROKER_URL}:{config.Config.KAFKA_BROKER_PORT}',
        'group.id': 'myGroupId',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': 'false',
        'max.poll.interval.ms': '86400000'
    }

    try:

        consumer = Consumer(consumer_config)
        consumer.subscribe(['updates'])
        
        while True:

            # read single message at a time
            msg = consumer.poll(0)

            if msg is None:
                gevent.sleep(config.DevelopmentConfig.KAFKA_CONSUMER_THREAD_SLEEP_TIME)
                continue
            if msg.error():
                print("Error reading message : {}".format(msg.error()))
                continue
            # You can parse message and save to data base here
            callstr = msg.value().decode('utf-8')
            print(callstr)

    except Exception as ex:
        print("Kafka Exception : {}", ex)

    finally:
        print("closing consumer")
        consumer.close()

我尝试将 groupId 设置为不同的东西,重新启动生产者、zookeper ...

python apache-kafka kafka-consumer-api confluent-kafka-python
1个回答
0
投票

您是否在第一次运行代码时获得了现有数据?如果是这样,这就是消费者群体的工作方式;他们在重新启动时保持位置。

您将需要一个新的 group.id,或者您可以在外部使用

kafka-consumer-groups
CLI 命令(您需要下载 Kafka)来重置偏移量。

或者,生产者代码可能只是丢失数据。例如,您是否使用

kafka-console-consumer
看到了您期望的数据?重启任何东西都无济于事。

© www.soinside.com 2019 - 2024. All rights reserved.