我想从Kafka工具数据中的主题进行分析,该数据昨天插入了该主题。该主题包含超过600亿个数据。而且我只需要新数据。我有一个专注于时间戳的解析器,如果时间戳超过今天的数据,它将对其进行解析。但是需要很多时间。例如
for msg in consumer:
s = msg[3]
dt_object = datetime.datetime.fromtimestamp(s/1000)
date1 = dt_object.strftime('%Y-%m-%d %H:%M:%S')
date = datetime.datetime.strptime('2020-06-04 00:00:00', '%Y-%m-%d %H:%M:%S')
if dt_object > date:
print(date1)
num_rows = num_rows + 1
m = json.loads(msg.value)
您可以使用offsets_for_times来查找您要使用的最早时间戳的每个分区的偏移量,然后在使用之前使用Consumer的seek到该偏移量。
您可以在这里看到一个示例:https://github.com/confluentinc/confluent-kafka-python/issues/497