我正在将 kafka 消息写入 hive 仓库位置中的一个文件,并且通过一条消息,我还想在每条记录中写入 system_date。
topicPartition=[TopicPartition(mytopic, p) for p in range(0,mytopicpartitions)]
consumer.assign(topicPartition)
consumer.seek_to_beginning()
print("start hive connection")
conn = hive.Connection(host="xxxx", port=xx, username="xx",password="xx",auth="CUSTOM")
print("end hive connection")
f = open("/location/kafkafeed.txt", "w")
# Reading and printing from the Kafka topic up to the desired limit
for msg in consumer:
# Extracting data in JSON format
payload = msg.value.decode('utf-8')
f.write(payload+"\t")
print(f"writing into a file")
我应该如何将有效负载和日期写在一起?
你只有一个字符串,而不是一个可以添加日期的字典。
您需要首先解析您的 json 记录。例如
import json
for msg in consumer:
payload = json.loads(msg.value.decode('utf-8'))
payload["date"] = msg.timestamp
try:
# TODO: write your payload somewhere
except:
break
# TODO: close the file/connection you're writing to