如何消费kafka消息上的最后一条消息或者根据时间戳消费消息?

问题描述 投票:0回答:1
    def kafkaa(self, auto_offset_reset, timeout=500):
        group_name = "group name"
        config = {"bootstrap.servers": "server",
                           "schema.registry.url": "url",
                           "group.id": group_name,
                           "enable.auto.commit": False,
                           "auto.offset.reset": False, // True
                           "sasl.mechanisms": "sasl",
                           "security.protocol": "protocol",
                           "sasl.username": "username",
                           "sasl.password": "pw"}
        consumer = AvroConsumer(config)
        data_consumed = []
        consumer.subscribe(kafkaTopic)
        while True:
            if time.time() > time.time() + timeout:
            break
            else:
            message = consumer.poll()
            if message is not None:
                kafka_ms.append(message)
                consumer.commit(asynchronous=False)
        consumer.close()
        return data_consumed
    `

使用 auto.offset.reset =latest 时,如果未使用组 ID,则不会返回任何值,因为流并不总是有消息可供使用。

使用 auto.offset.reset =latest 时,组 id 为现有组 id,这会返回偏移后的所有内容,直到超时,但代理会重新启动

python apache-kafka kafka-consumer-api confluent-kafka-python
1个回答
0
投票

基于时间戳

您需要使用

offset_for_times
函数来查找该时间戳的任何主题的偏移量,然后在从该时间戳开始轮询之前向使用者查找这些分区偏移量

© www.soinside.com 2019 - 2024. All rights reserved.