我正在 Scala 中使用 Apache Spark 开发一个项目,在尝试迭代 DataFrame 的行并从 CSV 文件的列中提取值时遇到问题。
这是我正在使用的代码:
import org.apache.spark.sql.SparkSession
object ExampleDataFrameIteration {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("ExampleDataFrameIteration").getOrCreate()
// Load the CSV file
val df = spark.read.csv("path/to/file.csv")
// Iterate over the rows
df.foreach { row =>
// Extract values from columns
val number = row.getAs[String]("number")
val account = row.getAs[String]("account")
// Show the values
println(s"number: $number, Account: $account")
}
spark.stop()
}
}
但是,运行代码时,我没有在控制台中获得预期的输出。即使 CSV 文件具有有效内容, println 似乎也没有显示任何内容。
我已经检查了 CSV 文件的路径,并且确定它是正确的。此外,我确认 CSV 文件的标题行包含列名称“number”和“account”。
有人可以帮助我确定可能导致此问题的原因以及如何解决它吗?
预先感谢您的任何帮助或建议!
Spark 触发的作业是从驱动程序发起的,由于运行应用程序的分布式特性,操作在执行程序上运行,并将结果返回给驱动程序。
这意味着如果您想以
println
的形式获取结果,则必须使用 collect
等操作将结果带回驱动程序节点。
如果您用
foreach
补充 collect
操作,如下所示,您应该能够看到预期的数据:
df.collect.foreach { row =>
// Extract values from columns
val number = row.getAs[String]("number")
val account = row.getAs[String]("account")
// Show the values
println(s"number: $number, Account: $account")
}