仅从Kafka工具解析新数据

问题描述 投票:0回答:1

我想从Kafka工具数据中的主题进行分析,该数据昨天插入了该主题。该主题包含超过600亿个数据。而且我只需要新数据。我有一个专注于时间戳的解析器,如果时间戳超过今天的数据,它将对其进行解析。但是需要很多时间。例如

for msg in consumer:
    s = msg[3]
    dt_object = datetime.datetime.fromtimestamp(s/1000)
    date1 = dt_object.strftime('%Y-%m-%d %H:%M:%S')
    date = datetime.datetime.strptime('2020-06-04 00:00:00', '%Y-%m-%d %H:%M:%S')

    if dt_object > date:
        print(date1)
        num_rows = num_rows + 1
        m = json.loads(msg.value)

enter image description here

python apache-kafka
1个回答
0
投票

您可以使用offsets_for_times来查找您要使用的最早时间戳的每个分区的偏移量,然后在使用之前使用Consumer的seek到该偏移量。

您可以在这里看到一个示例:https://github.com/confluentinc/confluent-kafka-python/issues/497

© www.soinside.com 2019 - 2024. All rights reserved.