我正在使用 Kafka 和 Debezium 从数据库捕获行相关事件,它按预期工作
在 Python 中,我设置了一个消费者来处理来自主题的 JSON 消息
# Kafka consumer configuration
config = {
'bootstrap.servers': 'my-ip:9092',
'group.id': 'my-group',
'auto.offset.reset': 'latest'
}
# Create the Consumer instance
consumer = Consumer(config)
# Subscribe to the topic
topic = 'my-ip.Database.Table'
consumer.subscribe([topic])
它工作得很好,问题是我需要这个消费者只关心最新的消息
我可以处理每条消息并根据时间戳丢弃内容,但我宁愿从正确的偏移量开始消费
这是我用来处理消息的循环
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
print("No new messages.")
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached.")
continue
else:
print(f"Error: {msg.error()}")
break
message = json.loads(msg.value().decode("utf-8"))
payload = message.get("payload", {})
operation = payload.get("op") # c: create/insert, u: update, d: delete
match operation:
case "c":
print("Processing insert operation.")
process_message(payload["after"], "Insert")
case "u":
print("Processing update operation.")
process_message(payload["after"], "Update")
case "d":
print("Delete operation detected, not processing.")
case _:
print(f"Unhandled operation type: {operation}")
except KeyboardInterrupt:
print("Exiting consumer...")
finally:
consumer.close()
print("Consumer closed.")
如何将偏移量从我拥有的这个消费者代码移到末尾?谢谢!
您可以在创建消费者时选择设置
auto_offset_reset
。它已经默认为最新。
如果您想在处理时跳到主题末尾,请使用消费者的搜索功能