迭代Spark数据帧中的行和列

问题描述 投票:12回答:4

我有以下动态创建的Spark数据帧:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

现在,我需要迭代sqlDF中的每一行和列来打印每一列,这是我的尝试:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

row是类型Row,但是不可迭代,这就是为什么这段代码在row.foreach中抛出编译错误的原因。如何迭代Row中的每一列?

scala apache-spark apache-spark-sql spark-dataframe
4个回答
9
投票

你可以用RowSeq转换成toSeq。一旦转向Seq,你可以像往常一样用foreachmap或任何你需要的东西迭代它

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

输出:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40

9
投票

考虑你有下面的Dataframe

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+

要循环使用Dataframe并从Dataframe中提取元素,您可以选择以下方法之一。

方法1 - 使用foreach循环

不能使用foreach循环直接循环数据帧。为此,首先必须使用case class定义数据帧的模式,然后必须将此模式指定给数据帧。

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))

请看下面的结果:

enter image description here

方法2 - 使用rdd循环

在Dataframe上使用rdd.collectrow变量将包含rdd行类型的每一行Dataframe。要从行中获取每个元素,请使用row.mkString(","),它将包含逗号分隔值中每行的值。使用split函数(内置函数),您可以使用索引访问rdd行的每个列值。

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}

请注意,这种方法有两个缺点。 1.如果列值中有,,则数据将错误地拆分到相邻列。 2. rdd.collect是一个action,它将所有数据返回给驱动程序的内存,其中驱动程序的内存可能没有那么大的数据来保存数据,最终导致应用程序失败。

我建议使用方法1。

方法3 - 使用where和select

您可以直接使用whereselect,它将在内部循环并查找数据。由于它不应该抛出索引超出绑定的异常,因此使用了if条件

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

方法4 - 使用临时表

您可以将数据帧注册为临时表,该表将存储在spark存储器中。然后,您可以像使用其他数据库一样使用select查询来查询数据,然后收集并保存在变量中

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")

1
投票

你应该在你的mkString上使用Row

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

但请注意,这将打印在执行程序JVM的内部,因此通常您将看不到输出(除非您使用master = local)


1
投票

sqlDF.foreach不适合我,但来自@Sarath Avanavu的方法1回答有效,但它也在播放记录的顺序。

我找到了另一种有效的方法

df.collect().foreach { row =>
   println(row.mkString(","))
}
© www.soinside.com 2019 - 2024. All rights reserved.