如何将JSON加载到hive表中?

问题描述 投票:0回答:1

我有一个 python 代码,它消耗来自 Kafka 主题的消息。我想将这些消息存储到 hive 表中。

import ssl
from kafka import KafkaConsumer, TopicPartition
from pyhive import hive

ssl.match_hostname = lambda cert, hostname: True
mytopic='xyz'
mygroupid='ABC'
mytopicpartitions = 10

print("start kafka connection")

consumer = KafkaConsumer(bootstrap_servers='xxx:xx', group_id=mygroupid, security_protocol='SSL',s ssl_check_hostname=True, ssl_cafile='xx.pem', ssl_certfile='certificate.pem', ssl_keyfile='key.pem', auto_offset_reset='latest', consumer_timeout_ms=60000)

print("end kafka connection")

# Set topic and partition
topicPartition=[TopicPartition(mytopic, p) for p in range(0,mytopicpartitions)]
consumer.assign(topicPartition)
consumer.seek_to_beginning()

conn = hive.Connection(host="00000", port=1111, username="user",password="pass",auth="CUSTOM")

cursor = conn.cursor()

query=("insert into table sample values('{}')".format(consumer))
cursor.execute(query)

错误:正在加载对象 不是实际数据。

python apache-kafka hive consumer
1个回答
0
投票

消费者变量不是记录。您需要一个 for 循环来从 Kafka 轮询数据。您还应该订阅该主题,而不是手动将分区分配给消费者。

或者,使用 PyFlink、Pyspark 或 Kafka Connect 来填充 Hive,而不是普通的 pyhive 连接

© www.soinside.com 2019 - 2024. All rights reserved.