是否可以在 Kafka 控制台消费者中检索特定时间戳范围的消息?
例如昨天 08:00 到 09:00 之间的 kafka 消息。
kcat
来消费两个时间戳之间的消息:
kcat -b localhost:9092 -C -t mytopic -o s@1568276612443 -o e@1568276617901
哪里
s@
表示开始时间戳(以毫秒为单位)e@
表示以毫秒为单位的结束时间戳(不包含)是的,从Kafka版本0.10.1开始就可以这样做。
使用
KafkaConsumer中的函数
offsetsForTimes
:
按时间戳查找给定分区的偏移量。这 每个分区返回的偏移量是最早的偏移量 时间戳大于或等于给定时间戳 对应的分区。这是一个阻塞调用。消费者确实 不必分配分区。
参考:https://docs.confluence.io/platform/current/clients/confluence-kafka-python/html/index.html#id14
from confluent_kafka import Consumer,KafkaError,TopicPartition
import argparse
import os
broker_list = '10.100.110.35:9092'
group = 'kcat_consumer'
consumer_conf = {'bootstrap.servers': broker_list,
'group.id': group,
# 'auto.offset.reset': 'earliest'
'enable.auto.commit': False,
}
consumer = Consumer(consumer_conf)
partition = 0
timestamp_ms = 1618258800000
timestamp_offsets = {partition: timestamp_ms}
topic = 'hw-blacklist-1195'
tp = TopicPartition(topic, partition, timestamp_ms)
offsets = consumer.offsets_for_times([tp])
print(offsets[0].offset)
consumer.assign([TopicPartition(topic,0,offsets[0].offset)])
while True:
msg = consumer.poll(1.0,)
if not msg:
print(msg)
break
print(msg.value())