如何在Spark中转置数据框?

问题描述 投票:0回答:2

我有以下数据帧:

+--------------------+---+---+-----+----+--------+----+
|                  ak| 1 | 2 |  3  | 4  |   5    |  6 |
+--------------------+---+---+-----+----+--------+----+
|8dce120638dbdf438   |  2|  1|    0|   0|       0|   0|
|3fd28484316249e95   |  1|  0|    3|   1|       4|   5|
|3636b43f64db33889   |  9|  3|    3|   4|      18|  11|
+--------------------+---+---+-----+----+--------+----+

我想将它转换为以下内容:

ak                 depth    user_count
8dce120638dbdf438    1       2
8dce120638dbdf438    2       1
8dce120638dbdf438    3       0
8dce120638dbdf438    4       0
8dce120638dbdf438    5       0
8dce120638dbdf438    6       0
3fd28484316249e95    1       1
3fd28484316249e95    2       0
3fd28484316249e95    3       3
3fd28484316249e95    4       1
3fd28484316249e95    5       4
3fd28484316249e95    6       5
3fd28484316249e95    1       9
3fd28484316249e95    2       3
3fd28484316249e95    3       3
3fd28484316249e95    4       4
3fd28484316249e95    5       18
3fd28484316249e95    6       11

如何在Scala中执行此操作?

scala apache-spark-sql transpose
2个回答
2
投票

类似于@Ramesh Maharjan的方法,但没有使用UDF - 相反,使用Spark的内置arraystruct函数来构造一个可以展开的类似数组:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.types._

// per column name, create a struct (similar to a tuple) of the column name and value:
def arrayItem(name: String) = struct(lit(name) cast IntegerType as "depth", $"$name" as "user_count")

// create an array of these per column, explode it and select the relevant columns:
df.withColumn("tmp", explode(array(df.columns.tail.map(arrayItem): _*)))
  .select($"ak", $"tmp.depth", $"tmp.user_count")

2
投票

解决方案似乎很容易将列名称的值收集到数组表单中,然后使用explode函数将数组的每个元素分隔成单独的行,然后最后将键和值分隔为单独的列。

将以上说明概括为代码并进行说明如下

val columns = df.columns.tail   //selecting columns to be changed to rows

import org.apache.spark.sql.functions._
//defining udf for zipping the column names with value and returning as array of column names zipped with column values
def zipUdf = udf((cols: collection.mutable.WrappedArray[String], vals: collection.mutable.WrappedArray[String]) => cols.zip(vals))

df.select(col("ak"), zipUdf(lit(columns), array(columns.map(col): _*)).as("depth"))   //calling udf function above
    .withColumn("depth", explode(col("depth")))                                       //exploding the array column to be on separate rows
    .select(col("ak"), col("depth._1").as("depth"), col("depth._2").as("user_count")) //selecting columns as required in output
  .show(false)

您应该具有以下输出

+-----------------+-----+----------+
|ak               |depth|user_count|
+-----------------+-----+----------+
|8dce120638dbdf438|1    |2         |
|8dce120638dbdf438|2    |1         |
|8dce120638dbdf438|3    |0         |
|8dce120638dbdf438|4    |0         |
|8dce120638dbdf438|5    |0         |
|8dce120638dbdf438|6    |0         |
|3fd28484316249e95|1    |1         |
|3fd28484316249e95|2    |0         |
|3fd28484316249e95|3    |3         |
|3fd28484316249e95|4    |1         |
|3fd28484316249e95|5    |4         |
|3fd28484316249e95|6    |5         |
|3636b43f64db33889|1    |9         |
|3636b43f64db33889|2    |3         |
|3636b43f64db33889|3    |3         |
|3636b43f64db33889|4    |4         |
|3636b43f64db33889|5    |18        |
|3636b43f64db33889|6    |11        |
+-----------------+-----+----------+
© www.soinside.com 2019 - 2024. All rights reserved.