我有两个几乎完全相同的Pyspark数据帧:相同的行数和row_id,相同的模式,但是每一行的某些列上的值不同。
我想确定每一行的列是什么。
示例:
数据框A
id fname lname email
1 Michael Jackson [email protected]
2 Roger Moore [email protected]
3 Angela Merkel [email protected]
数据框B
id fname lname email
1 Michael Jordan [email protected]
2 Gordon Moore [email protected]
3 Angela Markle [email protected]
预期输出是字典列表:
[
{"1": ["lname"]},
{"2": ["fname"] },
{"3": ["lname", "email"] }
]
df1.show()
df2.show()
+---+-------+-------+-----------------+
| id| fname| lname| email|
+---+-------+-------+-----------------+
| 1|Michael|Jackson| [email protected]|
| 2| Roger| Moore|[email protected]|
| 3| Angela| Merkel| [email protected]|
+---+-------+-------+-----------------+
+---+-------+------+-----------------+
| id| fname| lname| email|
+---+-------+------+-----------------+
| 1|Michael|Jordan| [email protected]|
| 2| Gordon| Moore|[email protected]|
| 3| Angela|Merkle| [email protected]|
+---+-------+------+-----------------+
我基本上只是joined on ID
,由renaming columns for second df
进行了比较,如果它们相等,则该列将为空,如果不相等,它将采用该列的名称,然后我根据这些条件创建了一个array
列,然后从该filter
列中创建了array
出None values,然后以2种不同方式进行了收集,如下所示。
from pyspark.sql import functions as F
df3=df1.join(df2.select(F.col("id"),F.col("fname").alias("fname1"),F.col("lname").alias("lname1"),F.col("email").alias("email1")),['id'])
collected=df3.withColumn("fname1", F.when(F.col("fname1")!=F.col("fname"),F.lit("fname")).otherwise(F.lit(None)))\
.withColumn("lname1", F.when(F.col("lname1")!=F.col("lname"),F.lit("lname")).otherwise(F.lit(None)))\
.withColumn("email1", F.when(F.col("email1")!=F.col("email"),F.lit("email")).otherwise(F.lit(None)))\
.withColumn("different", F.array(*(x for x in ['fname1','lname1','email1']))).select("id","different")\
.withColumn("different", F.expr("""filter(different, x-> x is not null)""")).orderBy("id").collect()
dic={}
for i in collected:
dic.update({i[0] : i[1]})
dic
Out[43]: {1: ['lname'], 2: ['fname'], 3: ['lname', 'email']}
或者您可以将其收集为Json(输出看起来会有些不同):
from pyspark.sql import functions as F
df3=df1.join(df2.select(F.col("id"),F.col("fname").alias("fname1"),F.col("lname").alias("lname1"),F.col("email").alias("email1")),['id'])
collected=df3.withColumn("fname1", F.when(F.col("fname1")!=F.col("fname"),F.lit("fname")).otherwise(F.lit(None)))\
.withColumn("lname1", F.when(F.col("lname1")!=F.col("lname"),F.lit("lname")).otherwise(F.lit(None)))\
.withColumn("email1", F.when(F.col("email1")!=F.col("email"),F.lit("email")).otherwise(F.lit(None)))\
.withColumn("different", F.array(*(x for x in ['fname1','lname1','email1']))).select("id","different")\
.withColumn("different", F.expr("""filter(different, x-> x is not null)""")).orderBy("id").toJSON().collect()
collected
Out[46]: ['{"id":1,"different":["lname"]}',
'{"id":2,"different":["fname"]}',
'{"id":3,"different":["lname","email"]}']