所以我知道如何来比较符合使用减法两个数据帧和删除行。那也行。
我知道如何工会不匹配与不匹配的两个表的结果创建一个新的DF值。
我想不出该怎么做才能为符合(独自离开ID列)使用pyspark以分布式的方式是那么空值
例:
df_as_list = [['id','name','monthly_sales'],
[101,'John Snow', 1234.56],
[102,'Daenerys Targaryen', 9294.96],
[103,'Saul Goodman', 1274.57],
[104,'Bobby Axelrob', 1123459.56],
[105,'Joe Miller', 34.56],
[106,'James Holden', 1.23]]
my_schema = df_as_list.pop(0)
df1 = spark.createDataFrame(df_as_list, my_schema)
df_as_list = [['id','name','monthly_sales'],
[101,'John Snow', 777.56],
[102,'Daenerys Targaryen', 9294.96],
[103,'Saul Goodman', 1274.57],
[104,'Bobby Axelrob', 1123459.56],
[105,'Joe Miller', 34.56],
[1106,'James Holden', 1.23]]
my_schema = df_as_list.pop(0)
df2 = spark.createDataFrame(df_as_list, my_schema)
df1.show()
df2.show()
所需的输出:
+---+------------------+-------------+
| id| name|monthly_sales|
+---+------------------+-------------+
|101| | 1234.56|
|101| | 777.56|
+---+------------------+-------------+
一种方法是先找到那里有差异id
s,并找出哪些列是相等的:
from functools import reduce
diffs = df1.join(df2, on="id")\
.where(reduce(lambda a, b: a|b, [df1[c] != df2[c] for c in df1.columns]))\
.select("id", *[(df1[c] == df2[c]).alias(c) for c in df1.columns if c != "id"])
diffs.show()
#+---+----+-------------+
#| id|name|monthly_sales|
#+---+----+-------------+
#|101|true| false|
#+---+----+-------------+
条件reduce(lambda a, b: a|b, [df1[c] != df2[c] for c in df1.columns])
将只保留行,其中至少一列是两个DataFrames之间不同。
现在使用diffs
加入两个DataFrames的工会,如果你想,如果他们是相同的,以显示该列或null
使用布尔值来确定。
from pyspark.sql.functions import when, col, lit
df1.union(df2).alias("u")\
.join(diffs.alias("d"), on="id")\
.select(
"id",
*[
when(
col("d."+c),
lit(None)
).otherwise(col("u."+c)).alias(c)
for c in diffs.columns
if c != "id"
]
)\
.show()
#+---+----+-------------+
#| id|name|monthly_sales|
#+---+----+-------------+
#|101|null| 777.56|
#|101|null| 1234.56|
#+---+----+-------------+
你将不得不把null
在匹配列(而不是空字符串),因为该类型的列必须是一致的(除非你施放一切字符串)。