假设我有两个 Spark 数据框:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Example data for DataFrame 1
data1 = [
("Pool_A", "A", "X", 10),
("Pool_A", "A", "Y", 20),
("Pool_A", "B", "X", 15),
("Pool_B", "A", "X", 5),
("Pool_B", "B", "Y", 25),
]
# Define the schema for DataFrame 1
df1_schema = ["pool", "col1", "col2", "value"]
# Create DataFrame 1
df1 = spark.createDataFrame(data1, df1_schema)
# Example data for DataFrame 2
data2 = [
("A", "X", 100),
("A", "Y", 200),
("B", "X", 150),
("B", "Y", 250),
("C", "X", 300),
]
# Define the schema for DataFrame 2
df2_schema = ["col1", "col2", "default_value"]
# Create DataFrame 2
df2 = spark.createDataFrame(data2, df2_schema)
我想通过传播每个“池”的“col1”、“col2”的所有可能组合来连接两个数据帧,并具有与其关联的默认“值”。我有一个使用
crossJoin
的解决方案,但想看看是否还有其他优雅的解决方案(+使用 crossJoin
的性能成本)
这是所需的输出:
+-------+----+----+-----+
| pool|col1|col2|value|
+-------+----+----+-----+
| Pool_B| A| X| 5|
| Pool_B| B| Y| 25|
| Pool_B| C| X| 300|
| Pool_B| B| X| 150|
| Pool_B| A| Y| 200|
| Pool_A| A| X| 10|
| Pool_A| B| X| 15|
| Pool_A| A| Y| 20|
| Pool_A| B| Y| 250|
| Pool_A| C| X| 300|
+-------+----+----+-----+
crossJoin
,真的没有其他办法了。它获取两个不同数据帧的所有组合。但在此之前,您需要制作一个仅包含“池”的小型数据框。
在
crossJoin
之后,我们可以使用 df1 中的 join
值并使用 coalesce
使用默认值填充间隙(多头)。
from pyspark.sql import functions as F
df_pools = df1.select('pool').distinct()
df_comb = df_pools.crossJoin(df2)
df_joined = df_comb.join(df1, ['pool', 'col1', 'col2'], 'left')
df_coalesced = df_joined.select(
'pool', 'col1', 'col2',
F.coalesce('value', 'default_value').alias('value')
)
df_coalesced.show()
# +------+----+----+-----+
# | pool|col1|col2|value|
# +------+----+----+-----+
# |Pool_B| A| Y| 200|
# |Pool_A| A| X| 10|
# |Pool_B| A| X| 5|
# |Pool_A| A| Y| 20|
# |Pool_A| B| Y| 250|
# |Pool_B| B| X| 150|
# |Pool_A| B| X| 15|
# |Pool_A| C| X| 300|
# |Pool_B| B| Y| 25|
# |Pool_B| C| X| 300|
# +------+----+----+-----+