如何开始实时消费来自 Kafka 的消息而不是最新的偏移量

问题描述 投票:0回答:1

我正在使用 Kafka 和 Debezium 从数据库捕获行相关事件,它按预期工作

在 Python 中,我设置了一个消费者来处理来自主题的 JSON 消息

# Kafka consumer configuration
config = {
    'bootstrap.servers': 'my-ip:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'latest'  
}

# Create the Consumer instance
consumer = Consumer(config)

# Subscribe to the topic
topic = 'my-ip.Database.Table'
consumer.subscribe([topic])

它工作得很好,问题是我需要这个消费者只关心最新的消息

我可以处理每条消息并根据时间戳丢弃内容,但我宁愿从正确的偏移量开始消费

这是我用来处理消息的循环

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            print("No new messages.")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print("End of partition reached.")
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        message = json.loads(msg.value().decode("utf-8"))
        payload = message.get("payload", {})
        operation = payload.get("op")  # c: create/insert, u: update, d: delete

        match operation:
            case "c":
                print("Processing insert operation.")
                process_message(payload["after"], "Insert")
            case "u":
                print("Processing update operation.")
                process_message(payload["after"], "Update")
            case "d":
                print("Delete operation detected, not processing.")
            case _:
                print(f"Unhandled operation type: {operation}")

except KeyboardInterrupt:
    print("Exiting consumer...")
finally:
    consumer.close()
    print("Consumer closed.")

如何将偏移量从我拥有的这个消费者代码移到末尾?谢谢!

python apache-kafka kafka-consumer-api
1个回答
0
投票

您可以在创建消费者时选择设置

auto_offset_reset
。它已经默认为最新。

如果您想在处理时跳到主题末尾,请使用消费者的搜索功能

© www.soinside.com 2019 - 2024. All rights reserved.