有很多方法可以将spark数据帧读/写到kafka。我正在尝试从kafka主题中读取消息,并从中创建数据框架。能够从主题中提取消息,但是无法将其转换为数据名人。任何建议都会有所帮助。
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.context import SparkContext
from kafka import KafkaConsumer
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
consumer = KafkaConsumer('Jim_Topic')
for message in consumer:
data = message
print(data) # Printing the messages properly
df = data.map # am unable to convert it to a dataframe.
根据您的用例,您可以
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "Jim_Topic") .load() # Query data df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .as[(String, String)]
用于批量查询
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "Jim_Topic")
.load()
# Query data
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.as[(String, String)]