从值不同的两个数据框中获取列

问题描述 投票:1回答:1

我有两个几乎完全相同的Pyspark数据帧:相同的行数和row_id,相同的模式,但是每一行的某些列上的值不同。

我想确定每一行的列是什么。

示例:

数据框A

id  fname   lname   email
1   Michael Jackson [email protected]
2   Roger   Moore   [email protected]
3   Angela  Merkel  [email protected]

数据框B

id  fname   lname   email
1   Michael Jordan  [email protected]
2   Gordon  Moore   [email protected]
3   Angela  Markle  [email protected]

预期输出是字典列表:

[ 
  {"1": ["lname"]}, 
  {"2": ["fname"] }, 
  {"3": ["lname", "email"] }
]
python pyspark apache-spark-sql pyspark-sql
1个回答
0
投票
df1.show()
df2.show()

+---+-------+-------+-----------------+
| id|  fname|  lname|            email|
+---+-------+-------+-----------------+
|  1|Michael|Jackson|     [email protected]|
|  2|  Roger|  Moore|[email protected]|
|  3| Angela| Merkel|         [email protected]|
+---+-------+-------+-----------------+

+---+-------+------+-----------------+
| id|  fname| lname|            email|
+---+-------+------+-----------------+
|  1|Michael|Jordan|     [email protected]|
|  2| Gordon| Moore|[email protected]|
|  3| Angela|Merkle|        [email protected]|
+---+-------+------+-----------------+

我基本上只是joined on ID,由renaming columns for second df进行了比较,如果它们相等,则该列将为空,如果不相等,它将采用该列的名称,然后我根据这些条件创建了一个array列,然后从该filter列中创建了arrayNone values,然后以2种不同方式进行了收集,如下所示。

from pyspark.sql import functions as F

df3=df1.join(df2.select(F.col("id"),F.col("fname").alias("fname1"),F.col("lname").alias("lname1"),F.col("email").alias("email1")),['id'])
collected=df3.withColumn("fname1", F.when(F.col("fname1")!=F.col("fname"),F.lit("fname")).otherwise(F.lit(None)))\
  .withColumn("lname1", F.when(F.col("lname1")!=F.col("lname"),F.lit("lname")).otherwise(F.lit(None)))\
  .withColumn("email1", F.when(F.col("email1")!=F.col("email"),F.lit("email")).otherwise(F.lit(None)))\
  .withColumn("different", F.array(*(x for x in ['fname1','lname1','email1']))).select("id","different")\
  .withColumn("different", F.expr("""filter(different, x-> x is  not null)""")).orderBy("id").collect()
dic={}
for i in collected:
  dic.update({i[0] : i[1]})


dic

Out[43]: {1: ['lname'], 2: ['fname'], 3: ['lname', 'email']}

或者您可以将其收集为Json(输出看起来会有些不同):

from pyspark.sql import functions as F
df3=df1.join(df2.select(F.col("id"),F.col("fname").alias("fname1"),F.col("lname").alias("lname1"),F.col("email").alias("email1")),['id'])
collected=df3.withColumn("fname1", F.when(F.col("fname1")!=F.col("fname"),F.lit("fname")).otherwise(F.lit(None)))\
  .withColumn("lname1", F.when(F.col("lname1")!=F.col("lname"),F.lit("lname")).otherwise(F.lit(None)))\
  .withColumn("email1", F.when(F.col("email1")!=F.col("email"),F.lit("email")).otherwise(F.lit(None)))\
  .withColumn("different", F.array(*(x for x in ['fname1','lname1','email1']))).select("id","different")\
  .withColumn("different", F.expr("""filter(different, x-> x is  not null)""")).orderBy("id").toJSON().collect()

collected

Out[46]: ['{"id":1,"different":["lname"]}',
 '{"id":2,"different":["fname"]}',
 '{"id":3,"different":["lname","email"]}']
© www.soinside.com 2019 - 2024. All rights reserved.