如何在pyspark中的foreach()中将行转换为字典?

问题描述 投票:2回答:1

我有一个从Spark生成的数据框,我想将其用于writeStream,也想保存在数据库中。

我有以下代码:

output = (
        spark_event_df
        .writeStream
        .outputMode('update')
        .foreach(writerClass(**job_config_data))
        .trigger(processingTime="2 seconds")
        .start()
    )
    output.awaitTermination()

当我使用foreach()时,writerClass获得了Row,因此我无法将其转换为python中的字典。

如何从Row中的writerClass获取python数据类型(最好是字典),以便我可以根据需要进行操作并将其保存到数据库中?

apache-spark pyspark spark-structured-streaming
1个回答
1
投票

[如果您只是想将其保存为流的一部分,则可以使用foreachBatch和内置的JDBC编写器来完成。只需进行转换即可根据所需的输出模式来塑造数据,然后:

foreachBatch

如果绝对需要自定义逻辑来写入数据库,而内置JDBC编写器不支持该逻辑,则应使用DataFrame def writeBatch(input, batch_id): (input .write .format("jdbc") .option("url", url) .option("dbtable", tbl) .mode("append") .save()) output = (spark_event_df .writeStream .foreachBatch(writeBatch) .start()) output.awaitTermination() 方法批量写入行,而不是一次写入一行。如果使用此方法,则只需调用foreachPartition

就可以将Row对象转换为字典。
© www.soinside.com 2019 - 2024. All rights reserved.