如何使用Kafka Console Consumer消费两个时间戳之间的消息

问题描述 投票:0回答:3

是否可以在 Kafka 控制台消费者中检索特定时间戳范围的消息?

例如昨天 08:00 到 09:00 之间的 kafka 消息。

apache-kafka kafka-consumer-api
3个回答
18
投票

您可以使用

kcat
来消费两个时间戳之间的消息:

kcat -b localhost:9092 -C -t mytopic -o s@1568276612443 -o e@1568276617901

哪里

  • s@
    表示开始时间戳(以毫秒为单位)
  • e@
    表示以毫秒为单位的结束时间戳(不包含)

3
投票

是的,从Kafka版本0.10.1开始就可以这样做。
使用

KafkaConsumer
中的函数offsetsForTimes

按时间戳查找给定分区的偏移量。这 每个分区返回的偏移量是最早的偏移量 时间戳大于或等于给定时间戳 对应的分区。这是一个阻塞调用。消费者确实 不必分配分区。


0
投票

参考:https://docs.confluence.io/platform/current/clients/confluence-kafka-python/html/index.html#id14

from confluent_kafka import Consumer,KafkaError,TopicPartition
import argparse
import os

broker_list = '10.100.110.35:9092'
group = 'kcat_consumer'

consumer_conf = {'bootstrap.servers': broker_list,
                     'group.id': group,
                     #  'auto.offset.reset': 'earliest'
                     'enable.auto.commit': False,
                     }
consumer = Consumer(consumer_conf)

partition = 0
timestamp_ms = 1618258800000
timestamp_offsets = {partition: timestamp_ms}
topic = 'hw-blacklist-1195'
tp = TopicPartition(topic, partition, timestamp_ms)
offsets = consumer.offsets_for_times([tp])
print(offsets[0].offset)

consumer.assign([TopicPartition(topic,0,offsets[0].offset)])
while True:
    msg = consumer.poll(1.0,)
    if not msg:
        print(msg)
        break
    
    print(msg.value())
© www.soinside.com 2019 - 2024. All rights reserved.