在 Spark Scala 中迭代 DataFrame 行并从 CSV 文件中提取值时出现问题

问题描述 投票:0回答:1

我正在 Scala 中使用 Apache Spark 开发一个项目,在尝试迭代 DataFrame 的行并从 CSV 文件的列中提取值时遇到问题。

这是我正在使用的代码:

import org.apache.spark.sql.SparkSession

object ExampleDataFrameIteration {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("ExampleDataFrameIteration").getOrCreate()

    // Load the CSV file
    val df = spark.read.csv("path/to/file.csv")

    // Iterate over the rows
    df.foreach { row =>
      // Extract values from columns
      val number = row.getAs[String]("number")
      val account = row.getAs[String]("account")

      // Show the values
      println(s"number: $number, Account: $account")
    }

    spark.stop()
  }
}

但是,运行代码时,我没有在控制台中获得预期的输出。即使 CSV 文件具有有效内容, println 似乎也没有显示任何内容。

我已经检查了 CSV 文件的路径,并且确定它是正确的。此外,我确认 CSV 文件的标题行包含列名称“number”和“account”。

有人可以帮助我确定可能导致此问题的原因以及如何解决它吗?

预先感谢您的任何帮助或建议!

scala loops csv apache-spark
1个回答
0
投票

Spark 触发的作业是从驱动程序发起的,由于运行应用程序的分布式特性,操作在执行程序上运行,并将结果返回给驱动程序。

这意味着如果您想以

println
的形式获取结果,则必须使用
collect
等操作将结果带回驱动程序节点。

如果您用

foreach
补充
collect
操作,如下所示,您应该能够看到预期的数据:

df.collect.foreach { row =>
      // Extract values from columns
      val number = row.getAs[String]("number")
      val account = row.getAs[String]("account")

      // Show the values
      println(s"number: $number, Account: $account")
    }
© www.soinside.com 2019 - 2024. All rights reserved.