将多行的同一列拾取到多列的一行中[重复]

问题描述 投票:0回答:3

这个问题在这里已有答案:

我有两个DF以下

MasterDF

enter image description here

NumberDF(使用Hive加载创建)

enter image description here

欲望输出:

enter image description here

填充逻辑

  1. 对于Field1,需要选择sch_id,其中CAT ='PAY'和SUB_CAT ='client'
  2. 对于Field2,需要选择sch_id,其中CAT ='PAY'和SUB_CAT ='phr'
  3. 对于Field3,需要选择pay_id,其中CAT ='credit'和SUB_CAT ='spGrp'

目前在加入之前我在NumberDF上执行过滤并选择值EX:

 masterDF.as("master").join(NumberDF.filter(col("CAT")==="PAY" && col("SUB_CAT")==="phr").as("number"), "$master.id" ==="$number.id" , "leftouter" )
 .select($"master.*", $"number.sch_id".as("field1") )

以上方法需要多次加入。我研究了枢轴功能,但确实解决了我的问题

注意:请忽略代码中的语法错误

apache-spark hive apache-spark-2.0
3个回答
0
投票

更好的解决方案是在加入studentDF之前按列(主题)转动DataFrame(numberDF)。

pyspark代码看起来像这样

numberDF = spark.createDataFrame([(1, "Math", 80), (1, "English", 60), (1, "Science", 80)], ["id", "subject", "marks"])
studentDF = spark.createDataFrame([(1, "Vikas")],["id","name"])

>>> numberDF.show()
+---+-------+-----+
| id|subject|marks|
+---+-------+-----+
|  1|   Math|   80|
|  1|English|   60|
|  1|Science|   80|
+---+-------+-----+

>>> studentDF.show()
+---+-----+
| id| name|
+---+-----+
|  1|Vikas|
+---+-----+

pivotNumberDF = numberDF.groupBy("id").pivot("subject").sum("marks")

>>> pivotNumberDF.show()
+---+-------+----+-------+
| id|English|Math|Science|
+---+-------+----+-------+
|  1|     60|  80|     80|
+---+-------+----+-------+

>>> studentDF.join(pivotNumberDF, "id").show()
+---+-----+-------+----+-------+
| id| name|English|Math|Science|
+---+-----+-------+----+-------+
|  1|Vikas|     60|  80|     80|
+---+-----+-------+----+-------+


ref:http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html


0
投票

最后,我使用Pivot实现了它

flights.groupBy("ID", "CAT")
      .pivot("SUB_CAT", Seq("client", "phr", "spGrp")).agg(avg("SCH_ID").as("SCH_ID"), avg("pay_id").as("pay_id"))
      .groupBy("ID")
      .pivot("CAT", Seq("credit", "price"))
      .agg(
        avg("client_SCH_ID").as("client_sch_id"), avg("client_pay_id").as("client_pay_id")
        , avg("phr_SCH_ID").as("phr_SCH_ID"), avg("phr_pay_id").as("phr_pay_id")
        , avg("spGrp_SCH_ID").as("spGrp_SCH_ID"), avg("spGrp_pay_id").as("spGrp_pay_id")
      )

First Pivot会像桌子那样返回

+---+------+-------------+--------------+-----------+------------+-------------+--------------+
| ID|   CAT|client_SCH_ID|client_pay_id |phr_SCH_ID |phr_pay_id  |spnGrp_SCH_ID|spnGrp_pay_id |
+---+------+-------------+--------------+-----------+------------+-------------+--------------+
|  1|credit|          5.0|         105.0|        4.0|       104.0|          6.0|         106.0|
|  1| pay  |          2.0|         102.0|        1.0|       101.0|          3.0|         103.0|
+---+------+-------------+--------------+-----------+------------+-------------+--------------+

在第二个Pivot之后它会是这样的

+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
| ID|credit_client_sch_id|credit_client_pay_id | credit_phr_SCH_ID| credit_phr_pay_id |credit_spnGrp_SCH_ID|credit_spnGrp_pay_id |pay_client_sch_id|pay_client_pay_id |   pay_phr_SCH_ID|   pay_phr_pay_id |pay_spnGrp_SCH_ID|pay_spnGrp_pay_id |
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
|  1|                 5.0|                105.0|               4.0|              104.0|                 6.0|                106.0|             2.0|              102.0|              1.0|             101.0|              3.0|             103.0|
+---+--------------------+---------------------+------------------+-------------------+--------------------+---------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+

虽然我不确定表现。


-3
投票
df.createOrReplaceTempView("NumberDF")

df.createOrReplaceTempView("MasterDf")

val sqlDF = spark.sql("select m.id,t1.fld1,t2.fld2,t3.fld3,m.otherfields 
from 
(select id, (case when n.cat='pay' and n.sub_cat ='client' then n.sch_id end) fld1
from NumberDF n where case when n.cat='pay' and n.sub_cat ='client' then n.sch_id end is not null ) t1 ,
(select id, (case when  n.cat='pay' and n.sub_cat ='phr' then n.sch_id end) fld2
from NumberDF n where case when  n.cat='pay' and n.sub_cat ='phr' then n.sch_id end is not null ) t2,
(select id, (case when  n.cat='credit' and n.sub_cat ='spGrp' then n.pay_id end) fld3
from NumberDF n where case when  n.cat='credit' and n.sub_cat ='spGrp' then n.pay_id end is not null ) t3,
MasterDf m ")

sqlDF.show()
© www.soinside.com 2019 - 2024. All rights reserved.