将所有新行转换为带有Scala的Spark中的新列

问题描述 投票:1回答:2

我有一个数据框,其固定列为m1_amt到m4_amt,其中包含以下格式的数据:

+------+----------+----------+----------+-----------+
|Entity| m1_amt   | m2_amt   | m3_amt   | m4_amt    |
+------+----------+----------+----------+-----------+
|  ISO | 1        | 2        | 3        |  4        |
| TEST | 5        | 6        | 7        |  8        |
| Beta | 9        | 10       | 11       | 12        |
+------+----------+----------+----------+-----------+

我正在尝试将每个新行转换为一个新列:

+----------+-------+--------+------+ 
| Entity   | ISO   | TEST   | Beta | 
+----------+-------+--------+------+ 
| m1_amt   | 1     | 5      | 9    |
| m2_amt   | 2     | 6      | 10   | 
| m3_amt   | 3     | 7      | 11   |
| m4_amt   | 4     | 8      | 12   |
+----------+-------+--------+------+

如何在Spark和Scala中实现这一目标?

scala apache-spark apache-spark-sql apache-spark-dataset
2个回答
1
投票
    scala> df.show
    +------+------+------+------+------+
    |Entity|m1_amt|m2_amt|m3_amt|m4_amt|
    +------+------+------+------+------+
    |   ISO|     1|     2|     3|     4|
    |  TEST|     5|     6|     7|     8|
    |  Beta|     9|    10|    11|    12|
    +------+------+------+------+------+


    scala> val df1 = df.withColumn("amt", to_json(struct(col("m1_amt"),col("m2_amt"),col("m3_amt"),col("m4_amt"))))
                       .withColumn("amt", regexp_replace(col("amt"), """[\\{\\"\\}]""", ""))
                       .withColumn("amt", explode(split(col("amt"), ",")))
                       .withColumn("cols", split(col("amt"), ":")(0))
                       .withColumn("val", split(col("amt"), ":")(1))
                       .select("Entity","cols","val")

    scala> df1.show
    +------+------+---+
    |Entity|  cols|val|
    +------+------+---+
    |   ISO|m1_amt|  1|
    |   ISO|m2_amt|  2|
    |   ISO|m3_amt|  3|
    |   ISO|m4_amt|  4|
    |  TEST|m1_amt|  5|
    |  TEST|m2_amt|  6|
    |  TEST|m3_amt|  7|
    |  TEST|m4_amt|  8|
    |  Beta|m1_amt|  9|
    |  Beta|m2_amt| 10|
    |  Beta|m3_amt| 11|
    |  Beta|m4_amt| 12|
    +------+------+---+


scala> df1.groupBy(col("cols")).pivot("Entity").agg(concat_ws("",collect_set(col("val"))))
          .withColumnRenamed("cols", "Entity")
          .show()
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt|  11|  3|   7|
|m4_amt|  12|  4|   8|
|m2_amt|  10|  2|   6|
|m1_amt|   9|  1|   5|
+------+----+---+----+

2
投票

我尝试过以下方式:

    scala> val df=Seq(("ISO",1,2,3,4),
         | ("TEST",5,6,7,8),
         | ("Beta",9,10,11,12)).toDF("Entity","m1_amt","m2_amt","m3_amt","m4_amt")
    df: org.apache.spark.sql.DataFrame = [Entity: string, m1_amt: int ... 3 more fields]

    scala> df.show
    +------+------+------+------+------+
    |Entity|m1_amt|m2_amt|m3_amt|m4_amt|
    +------+------+------+------+------+
    |   ISO|     1|     2|     3|     4|
    |  TEST|     5|     6|     7|     8|
    |  Beta|     9|    10|    11|    12|
    +------+------+------+------+------+

scala> val selectDf= df.selectExpr("Entity","stack(4,'m1_amt',m1_amt,'m2_amt',m2_amt,'m3_amt',m3_amt,'m4_amt',m4_amt)")
selectDf: org.apache.spark.sql.DataFrame = [Entity: string, col0: string ... 1 more field]

scala> selectDf.show
+------+------+----+
|Entity|  col0|col1|
+------+------+----+
|   ISO|m1_amt|   1|
|   ISO|m2_amt|   2|
|   ISO|m3_amt|   3|
|   ISO|m4_amt|   4|
|  TEST|m1_amt|   5|
|  TEST|m2_amt|   6|
|  TEST|m3_amt|   7|
|  TEST|m4_amt|   8|
|  Beta|m1_amt|   9|
|  Beta|m2_amt|  10|
|  Beta|m3_amt|  11|
|  Beta|m4_amt|  12|
+------+------+----+

scala> selectDf.groupBy("col0").pivot("Entity").agg(concat_ws("",collect_list(col("col1")))).withColumnRenamed("col0","Entity").show
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt|  11|  3|   7|
|m4_amt|  12|  4|   8|
|m2_amt|  10|  2|   6|
|m1_amt|   9|  1|   5|
+------+----+---+----+

0
投票

我尝试过以下方式:

    scala> val df=Seq(("ISO",1,2,3,4),
         | ("TEST",5,6,7,8),
         | ("Beta",9,10,11,12)).toDF("Entity","m1_amt","m2_amt","m3_amt","m4_amt")
    df: org.apache.spark.sql.DataFrame = [Entity: string, m1_amt: int ... 3 more fields]

    scala> df.show
    +------+------+------+------+------+
    |Entity|m1_amt|m2_amt|m3_amt|m4_amt|
    +------+------+------+------+------+
    |   ISO|     1|     2|     3|     4|
    |  TEST|     5|     6|     7|     8|
    |  Beta|     9|    10|    11|    12|
    +------+------+------+------+------+

scala> val selectDf= df.selectExpr("Entity","stack(4,'m1_amt',m1_amt,'m2_amt',m2_amt,'m3_amt',m3_amt,'m4_amt',m4_amt)")
selectDf: org.apache.spark.sql.DataFrame = [Entity: string, col0: string ... 1 more field]

scala> selectDf.show
+------+------+----+
|Entity|  col0|col1|
+------+------+----+
|   ISO|m1_amt|   1|
|   ISO|m2_amt|   2|
|   ISO|m3_amt|   3|
|   ISO|m4_amt|   4|
|  TEST|m1_amt|   5|
|  TEST|m2_amt|   6|
|  TEST|m3_amt|   7|
|  TEST|m4_amt|   8|
|  Beta|m1_amt|   9|
|  Beta|m2_amt|  10|
|  Beta|m3_amt|  11|
|  Beta|m4_amt|  12|
+------+------+----+

scala> selectDf.groupBy("col0").pivot("Entity").agg(concat_ws("",collect_list(col("col1")))).withColumnRenamed("col0","Entity").show
+------+----+---+----+
|Entity|Beta|ISO|TEST|
+------+----+---+----+
|m3_amt|  11|  3|   7|
|m4_amt|  12|  4|   8|
|m2_amt|  10|  2|   6|
|m1_amt|   9|  1|   5|
+------+----+---+----+
© www.soinside.com 2019 - 2024. All rights reserved.