Scala:处理数据帧,而列中的值满足条件

问题描述 投票:1回答:1

我必须处理一个巨大的数据帧,通过数据帧的id列从服务下载文件。下载的逻辑和所有的更改都已准备好,但我不确定绕这个做出循环的最佳方法是什么。我在Databricks上运行它,这就是我需要以块的形式执行进程的原因。

数据框有一个“status”列,可以包含以下值:

“todo”,“处理”,“失败”,“成功”

在while循环中,我想执行以下任务:

while (there are rows with status "todo") {
   - get the first 10 rows if status is todo (DONE)
   - start processing the dataframe, update status to processing (DONE)
   - download files (call UDF), update status to succeeded or failed
     (DONE, not in the code here)
}

我想运行这个直到所有行'status是其他todo!问题是这个while循环没有完成,因为数据帧本身没有更新。它需要分配给另一个数据帧,但是如何将新的数据帧添加到循环中?

我的代码现在:

while(statusDoc.where("status == 'todo'").count > 0) {
  val todoDF = test.filter("status == 'todo'")

  val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
                           .otherwise(col("status")))

 statusDoc.join(processingDF, Seq("id"), "outer")
      .select($"id", \
       statusDoc("fileUrl"), \
       coalesce(processingDF("status"), statusDoc("status")).alias("status"))

}

联接应该是这样的:

val update = statusDoc.join(processingDF, Seq("id"), "outer")
                          .select($"id", statusDoc("fileUrl"),\
    coalesce(processingDF("status"), statusDoc("status")).alias("status"))

然后这个新的update数据帧应该用于下一轮循环。

scala apache-spark dataframe while-loop do-while
1个回答
1
投票

这里要记住的一件事是DataFrame(Spark)不可变,因为它们是分布式的。如果你做了一些,你不能保证给定的修改会在所有执行者网络中正确传播。而且您也无法保证某个特定部分的数据尚未在其他地方使用(例如,在另一个节点中)。

但您可以做的一件事是添加另一个包含更新值的列并删除旧列。

val update = statusDoc.
    .withColumnRenamed("status", "status_doc")
    .join(processingDF, Seq("id"), "outer")
    .withColumn("updated_status", udf((stold: String, stold: String) => if (stnew != null) stnew else stold).apply(col("status"), col("status_doc"))
    .drop("status_doc", "status")
    .withColumnRenamed("updated_status", "status")
    .select("id", "fileUrl", "status")

然后确保将“statusDoc”替换为“update”DataFrame。不要忘记将DataFrame设为“var”而不是“val”。我很惊讶你的IDE还没有大喊大叫。

另外,我确信你可以想出一种分配问题的方法,以便你避免使用while循环 - 我可以帮助你做到这一点,但我需要更清楚地描述你的问题。如果使用while循环,则不会使用集群的全部功能,因为while循环仅在主节点上执行。然后,每次只能处理10行。我确信您可以在单个地图操作中将所需的所有数据附加到整个DataFrame。

© www.soinside.com 2019 - 2024. All rights reserved.