我有一个从Spark生成的数据框,我想将其用于writeStream
,也想保存在数据库中。
我有以下代码:
output = (
spark_event_df
.writeStream
.outputMode('update')
.foreach(writerClass(**job_config_data))
.trigger(processingTime="2 seconds")
.start()
)
output.awaitTermination()
当我使用foreach()
时,writerClass
获得了Row
,因此我无法将其转换为python中的字典。
如何从Row
中的writerClass
获取python数据类型(最好是字典),以便我可以根据需要进行操作并将其保存到数据库中?
[如果您只是想将其保存为流的一部分,则可以使用foreachBatch
和内置的JDBC编写器来完成。只需进行转换即可根据所需的输出模式来塑造数据,然后:
foreachBatch
如果绝对需要自定义逻辑来写入数据库,而内置JDBC编写器不支持该逻辑,则应使用DataFrame def writeBatch(input, batch_id):
(input
.write
.format("jdbc")
.option("url", url)
.option("dbtable", tbl)
.mode("append")
.save())
output = (spark_event_df
.writeStream
.foreachBatch(writeBatch)
.start())
output.awaitTermination()
方法批量写入行,而不是一次写入一行。如果使用此方法,则只需调用foreachPartition