正在尝试将数据帧加载到Kafka主题。选择键和值时出错。任何建议都会有所帮助。
下面是我的代码,
data = spark.sql('select * from job')
kafka = data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream.outputMode(outputMode='Append').format('kafka')\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("topic", "Jim_Topic")\
.option("checkpointLocation", "C:/Hadoop/Data/CheckPointLocation/")\
.start()
kafka.awaitTermination()
下面是错误,
pyspark.sql.utils.AnalysisException: cannot resolve '`key`' given input columns: [job.JOB_ID,
job.JOB_TITLE, job.MAX_SALARY, job.MIN_SALARY]; line 1 pos 5;
'Project [unresolvedalias(cast('key as string), None), unresolvedalias(cast('value as string), None)]
+- Project [JOB_ID#0, JOB_TITLE#1, MIN_SALARY#2, MAX_SALARY#3]
+- SubqueryAlias `job`
+- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@1f3fc47a,csv,List(),Some(StructType(StructField(JOB_ID,StringType,true), StructField(JOB_TITLE,StringType,true), StructField(MIN_SALARY,StringType,true), StructField(MAX_SALARY,StringType,true))),List(),None,Map(sep -> ,, header -> false, path -> C:/Hadoop/Data/Job*.csv),None), FileSource[C:/Hadoop/Data/Job*.csv], [JOB_ID#0, JOB_TITLE#1, MIN_SALARY#2, MAX_SALARY#3]
[尝试将值转换为json,效果很好。现在可以将消息从spark流发送到kafka,
kafka = data.selectExpr("CAST(JOB_ID AS STRING) AS key", "to_json(struct(*)) AS value")\