我必须处理一个巨大的数据帧,通过数据帧的id
列从服务下载文件。下载的逻辑和所有的更改都已准备好,但我不确定绕这个做出循环的最佳方法是什么。我在Databricks上运行它,这就是我需要以块的形式执行进程的原因。
数据框有一个“status”列,可以包含以下值:
“todo”,“处理”,“失败”,“成功”
在while循环中,我想执行以下任务:
while (there are rows with status "todo") {
- get the first 10 rows if status is todo (DONE)
- start processing the dataframe, update status to processing (DONE)
- download files (call UDF), update status to succeeded or failed
(DONE, not in the code here)
}
我想运行这个直到所有行'status
是其他todo
!问题是这个while循环没有完成,因为数据帧本身没有更新。它需要分配给另一个数据帧,但是如何将新的数据帧添加到循环中?
我的代码现在:
while(statusDoc.where("status == 'todo'").count > 0) {
val todoDF = test.filter("status == 'todo'")
val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
.otherwise(col("status")))
statusDoc.join(processingDF, Seq("id"), "outer")
.select($"id", \
statusDoc("fileUrl"), \
coalesce(processingDF("status"), statusDoc("status")).alias("status"))
}
联接应该是这样的:
val update = statusDoc.join(processingDF, Seq("id"), "outer")
.select($"id", statusDoc("fileUrl"),\
coalesce(processingDF("status"), statusDoc("status")).alias("status"))
然后这个新的update
数据帧应该用于下一轮循环。
这里要记住的一件事是DataFrame(Spark)不可变,因为它们是分布式的。如果你做了一些,你不能保证给定的修改会在所有执行者网络中正确传播。而且您也无法保证某个特定部分的数据尚未在其他地方使用(例如,在另一个节点中)。
但您可以做的一件事是添加另一个包含更新值的列并删除旧列。
val update = statusDoc.
.withColumnRenamed("status", "status_doc")
.join(processingDF, Seq("id"), "outer")
.withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
.drop("status_doc", "status")
.withColumnRenamed("updated_status", "status")
.select("id", "fileUrl", "status")
然后确保将“statusDoc”替换为“update”DataFrame。不要忘记将DataFrame设为“var”而不是“val”。我很惊讶你的IDE还没有大喊大叫。
另外,我确信你可以想出一种分配问题的方法,以便你避免使用while循环 - 我可以帮助你做到这一点,但我需要更清楚地描述你的问题。如果使用while循环,则不会使用集群的全部功能,因为while循环仅在主节点上执行。然后,每次只能处理10行。我确信您可以在单个地图操作中将所需的所有数据附加到整个DataFrame。