PySpark:将Spark数据帧写入Kafka主题

问题描述 投票:0回答:1

正在尝试将数据帧加载到Kafka主题。选择键和值时出错。任何建议都会有所帮助。

下面是我的代码,

data = spark.sql('select * from job')

kafka = data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream.outputMode(outputMode='Append').format('kafka')\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("topic", "Jim_Topic")\
    .option("checkpointLocation", "C:/Hadoop/Data/CheckPointLocation/")\
    .start()

kafka.awaitTermination()

下面是错误,

pyspark.sql.utils.AnalysisException: cannot resolve '`key`' given input columns: [job.JOB_ID, 
job.JOB_TITLE, job.MAX_SALARY, job.MIN_SALARY]; line 1 pos 5;
'Project [unresolvedalias(cast('key as string), None), unresolvedalias(cast('value as string), None)]
+- Project [JOB_ID#0, JOB_TITLE#1, MIN_SALARY#2, MAX_SALARY#3]
   +- SubqueryAlias `job`
      +- StreamingRelation

DataSource(org.apache.spark.sql.SparkSession@1f3fc47a,csv,List(),Some(StructType(StructField(JOB_ID,StringType,true), StructField(JOB_TITLE,StringType,true), StructField(MIN_SALARY,StringType,true), StructField(MAX_SALARY,StringType,true))),List(),None,Map(sep -> ,, header -> false, path -> C:/Hadoop/Data/Job*.csv),None), FileSource[C:/Hadoop/Data/Job*.csv], [JOB_ID#0, JOB_TITLE#1, MIN_SALARY#2, MAX_SALARY#3]

pyspark apache-kafka spark-streaming
1个回答
0
投票

[尝试将值转换为json,效果很好。现在可以将消息从spark流发送到kafka,

kafka = data.selectExpr("CAST(JOB_ID AS STRING) AS key", "to_json(struct(*)) AS value")\
© www.soinside.com 2019 - 2024. All rights reserved.