PYSPARK - 在多个列上加入 nullsafe

问题描述 投票:0回答:2

假设我们有两个数据帧,我们想要将其与左反连接进行比较:

data1 = [
  (1, 11, 20, None),
  (2, 12, 22, 31),
]

data2 = [
  (1, 11, 20, None),
  (2, 12, 22, 31),
]

schema = StructType([ \
    StructField("value_1",IntegerType(), True), \
    StructField("value_2",IntegerType(), True), \
    StructField("value_3",IntegerType(), True), \
    StructField("value_4",IntegerType(), True), \
])

df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)

如何通过多个(所有)列对这些数据帧进行空安全连接? 我想出的唯一解决方案如下:

df = df1.join(df2, \
    ((df1.value_1.eqNullSafe(df2.value_1)) &
    (df1.value_2.eqNullSafe(df2.value_2)) &
    (df1.value_3.eqNullSafe(df2.value_3)) &
    (df1.value_4.eqNullSafe(df2.value_4))),
    "leftanti" \
)

但不幸的是,我们现在必须处理大量列的动态列表。 我们如何以某种方式重写此连接,以便我们可以提供要连接的列的列表。

谢谢和BR

python join pyspark apache-spark-sql databricks
2个回答
5
投票

据我理解问题陈述,您希望根据提供的列列表创建动态连接条件。我们可以使用

reduce()
模块中的
functools
来做到这一点。

join_cols = ['value_1', 'value_2', 'value_3', 'value_4']

from functools import reduce

join_condition = reduce(lambda x, y: x & y, [df1[k].eqNullSafe(df2[k]) for k in join_cols])

print(join_condition)
# Column<'((((value_1 <=> value_1) AND (value_2 <=> value_2)) AND (value_3 <=> value_3)) AND (value_4 <=> value_4))'>

您可以直接使用

join_condition
中的
.join()
参数。

df = df1.join(df2, join_condition, "leftanti")

0
投票

您可以只使用列表理解,例如上面给出的解决方案,您会得到相同的结果

df = df1.join(df2, [df1[k].eqNullSafe(df2[k]) for k in join_cols], "leftanti")

或者我更喜欢使用别名,这样我就可以立即使用处理后的数据帧,而不是先创建新变量。例如:

df = (
    df1
    .withColumn('left_or_right', lit('left'))
    .alias('left')
    .join(
        df2
        .withColumn('left_or_right', lit('right'))
        .alias('right'),
        [col(f'left.{c}').eqNullSafe(col(f'right.{c}')) for c in join_cols],
        'leftanti'
    )
)

df.sort('left.value_1').show(10,False)
© www.soinside.com 2019 - 2024. All rights reserved.