我有一个 python 代码,它消耗来自 Kafka 主题的消息。我想将这些消息存储到 hive 表中。
import ssl
from kafka import KafkaConsumer, TopicPartition
from pyhive import hive
ssl.match_hostname = lambda cert, hostname: True
mytopic='xyz'
mygroupid='ABC'
mytopicpartitions = 10
print("start kafka connection")
consumer = KafkaConsumer(bootstrap_servers='xxx:xx', group_id=mygroupid, security_protocol='SSL',s ssl_check_hostname=True, ssl_cafile='xx.pem', ssl_certfile='certificate.pem', ssl_keyfile='key.pem', auto_offset_reset='latest', consumer_timeout_ms=60000)
print("end kafka connection")
# Set topic and partition
topicPartition=[TopicPartition(mytopic, p) for p in range(0,mytopicpartitions)]
consumer.assign(topicPartition)
consumer.seek_to_beginning()
conn = hive.Connection(host="00000", port=1111, username="user",password="pass",auth="CUSTOM")
cursor = conn.cursor()
query=("insert into table sample values('{}')".format(consumer))
cursor.execute(query)
错误:正在加载对象
消费者变量不是记录。您需要一个 for 循环来从 Kafka 轮询数据。您还应该订阅该主题,而不是手动将分区分配给消费者。
或者,使用 PyFlink、Pyspark 或 Kafka Connect 来填充 Hive,而不是普通的 pyhive 连接