crossJoin 不使用 crossJoin 的两个 Spark 数据帧

问题描述 投票:0回答:1

假设我有两个 Spark 数据框:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Example data for DataFrame 1
data1 = [
    ("Pool_A", "A", "X", 10),
    ("Pool_A", "A", "Y", 20),
    ("Pool_A", "B", "X", 15),
    ("Pool_B", "A", "X", 5),
    ("Pool_B", "B", "Y", 25),
]

# Define the schema for DataFrame 1
df1_schema = ["pool", "col1", "col2", "value"]

# Create DataFrame 1
df1 = spark.createDataFrame(data1, df1_schema)

# Example data for DataFrame 2
data2 = [
    ("A", "X", 100),
    ("A", "Y", 200),
    ("B", "X", 150),
    ("B", "Y", 250),
    ("C", "X", 300),
]

# Define the schema for DataFrame 2
df2_schema = ["col1", "col2", "default_value"]

# Create DataFrame 2
df2 = spark.createDataFrame(data2, df2_schema)

我想通过传播每个“池”的“col1”、“col2”的所有可能组合来连接两个数据帧,并具有与其关联的默认“值”。我有一个使用

crossJoin
的解决方案,但想看看是否还有其他优雅的解决方案(+使用
crossJoin
的性能成本)

这是所需的输出:

+-------+----+----+-----+
|   pool|col1|col2|value|
+-------+----+----+-----+
| Pool_B|   A|   X|    5|
| Pool_B|   B|   Y|   25|
| Pool_B|   C|   X|  300|
| Pool_B|   B|   X|  150|
| Pool_B|   A|   Y|  200|
| Pool_A|   A|   X|   10|
| Pool_A|   B|   X|   15|
| Pool_A|   A|   Y|   20|
| Pool_A|   B|   Y|  250|
| Pool_A|   C|   X|  300|
+-------+----+----+-----+
apache-spark join pyspark data-manipulation cross-join
1个回答
0
投票

在大数据分布式计算中,除了

crossJoin
之外,真的没有其他方法可以获取两个不同数据帧的所有组合。但在此之前,您需要制作一个仅包含“池”的小型数据框。

crossJoin
之后,我们可以使用 df1 中的
join
值并使用
coalesce
使用默认值填充间隙(多头)。

from pyspark.sql import functions as F

df_pools = df1.select('pool').distinct()
df_comb = df_pools.crossJoin(df2)
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
    'pool', 'col1', 'col2',
    F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# |  pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B|   A|   Y|  200|
# |Pool_A|   A|   X|   10|
# |Pool_B|   A|   X|    5|
# |Pool_A|   A|   Y|   20|
# |Pool_A|   B|   Y|  250|
# |Pool_B|   B|   X|  150|
# |Pool_A|   B|   X|   15|
# |Pool_A|   C|   X|  300|
# |Pool_B|   B|   Y|   25|
# |Pool_B|   C|   X|  300|
# +------+----+----+-----+

话虽这么说,如果您确定池的数量不是很大,您可以从数据帧中提取值作为列表(放入驱动程序中)并将列表发送给执行器。

pools = [x[0] for x in df1.select('pool').distinct().collect()]
df_comb = df2.withColumn('pool', F.explode(F.array(*[F.lit(x) for x in pools])))
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
    'pool', 'col1', 'col2',
    F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# |  pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B|   A|   Y|  200|
# |Pool_A|   A|   X|   10|
# |Pool_B|   A|   X|    5|
# |Pool_A|   A|   Y|   20|
# |Pool_A|   B|   Y|  250|
# |Pool_B|   B|   X|  150|
# |Pool_A|   B|   X|   15|
# |Pool_A|   C|   X|  300|
# |Pool_B|   B|   Y|   25|
# |Pool_B|   C|   X|  300|
# +------+----+----+-----+

注意:在 Spark 3.4+ 中,您可以使用

F.array(*[F.lit(x) for x in pools])
 代替 F.lit(pools)

这种方法可以避免

crossJoin

使用

crossJoin
查询计划:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [pool#899, col1#907, col2#908, coalesce(value#921L, default_value#909L) AS value#927L]
   +- SortMergeJoin [pool#899, col1#907, col2#908], [pool#918, col1#919, col2#920], LeftOuter
      :- Sort [pool#899 ASC NULLS FIRST, col1#907 ASC NULLS FIRST, col2#908 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(pool#899, col1#907, col2#908, 200), ENSURE_REQUIREMENTS, [plan_id=4569]
      :     +- CartesianProduct
      :        :- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
      :        :  +- Exchange hashpartitioning(pool#899, 200), ENSURE_REQUIREMENTS, [plan_id=4564]
      :        :     +- HashAggregate(keys=[pool#899], functions=[], output=[pool#899])
      :        :        +- Project [pool#899]
      :        :           +- Scan ExistingRDD[pool#899,col1#900,col2#901,value#902L]
      :        +- Scan ExistingRDD[col1#907,col2#908,default_value#909L]
      +- Sort [pool#918 ASC NULLS FIRST, col1#919 ASC NULLS FIRST, col2#920 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(pool#918, col1#919, col2#920, 200), ENSURE_REQUIREMENTS, [plan_id=4570]
            +- Filter ((isnotnull(pool#918) AND isnotnull(col1#919)) AND isnotnull(col2#920))
               +- Scan ExistingRDD[pool#918,col1#919,col2#920,value#921L]

没有

crossJoin
的查询计划(即将列表发送给执行者):

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [pool#1379, col1#1371, col2#1372, coalesce(value#1366L, default_value#1373L) AS value#1389L]
   +- SortMergeJoin [pool#1379, col1#1371, col2#1372], [pool#1363, col1#1364, col2#1365], LeftOuter
      :- Sort [pool#1379 ASC NULLS FIRST, col1#1371 ASC NULLS FIRST, col2#1372 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(pool#1379, col1#1371, col2#1372, 200), ENSURE_REQUIREMENTS, [plan_id=6619]
      :     +- Generate explode([Pool_A,Pool_B]), [col1#1371, col2#1372, default_value#1373L], false, [pool#1379]
      :        +- Scan ExistingRDD[col1#1371,col2#1372,default_value#1373L]
      +- Sort [pool#1363 ASC NULLS FIRST, col1#1364 ASC NULLS FIRST, col2#1365 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(pool#1363, col1#1364, col2#1365, 200), ENSURE_REQUIREMENTS, [plan_id=6620]
            +- Filter ((isnotnull(pool#1363) AND isnotnull(col1#1364)) AND isnotnull(col2#1365))
               +- Scan ExistingRDD[pool#1363,col1#1364,col2#1365,value#1366L]
© www.soinside.com 2019 - 2024. All rights reserved.