使用offsets_for_times从时间戳消费

问题描述 投票:0回答:2

尝试使用 confluence_kafka.AvroConsumer 来消费给定时间戳的消息。

if flag:

    # creating a list
    topic_partitons_to_search = list(
        map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))

    print("Searching for offsets with %s" % topic_partitons_to_search)
    offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)
    print("offsets_for_times results: %s" % offsets)

    for x in offsets:
        c.seek(x)
    flag=False 

控制台返回这个

Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]
offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]
{'name': 'Hello'}
{'name': 'Hello'}
{'name': 'Hello1'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Offset 8'}
{'name': 'Offset 9'}
{'name': 'Offset 10'}
{'name': 'Offset 11'}
{'name': 'New'} 

这些是 my_topic2 的分区 0 中的所有消息(分区 1 中没有任何消息),我们应该什么也没有返回,因为我们没有从当前时间 (time.time()) 生成任何消息。然后我希望能够使用类似

time.time() - 60000
的东西来获取过去 60000 毫秒内的所有消息

python apache-kafka kafka-consumer-api confluent-kafka-python
2个回答
3
投票

Pythons time.time() 返回自纪元以来的秒数,offsets_for_times 使用纪元以来的毫秒数,因此当我以秒数发送时,它计算的日期比今天早得多,这意味着我们应该包括我所有的偏移量。


1
投票

您可以手动分配偏移量,而不是 c.seek

for p, o in zip(topic_partitons_to_search, offsets):
   p.offset = o.offset
consumer.assign(topic_partitons_to_search)

您可以使用 datetime+timedelta 并将其转换为时间戳,而不是使用 time.time() - 60000 之类的东西

from datetime import datetime, timedelta
from_date = (datetime.now()) - timedelta(days=1) # e.g. 1 day
from_date_ts = int(from_date.timestamp() * 1000)  # millisecond timestamps

topic_partitons_to_search = list(map(lambda p: TopicPartition('my_topic2', p, from_date_ts), range(0, 1))))

(使用 isoformat 而不是 datetime+timedelta 请参阅如何使用 confluence-kafka-python 消费最近 N 天的消息?

© www.soinside.com 2019 - 2024. All rights reserved.