我有以下输入数据框:
expected = spark.createDataFrame(
# fmt: off
data=[
{"id": "1", "group": "1", "start": 1_000_000, "stop": 1_001_200, "info": "info1"},
{"id": "1", "group": "6", "start": 6_001_000, "stop": 6_003_330, "info": "info2"},
{"id": "1", "group": "9", "start": 3_080_100, "stop": 3_081_000, "info": "info3"},
{"id": "2", "group": "1", "start": 1_000_000, "stop": 1_001_200, "info": "info4"},
{"id": "2", "group": "6", "start": 6_001_000, "stop": 6_003_330, "info": "info5"},
{"id": "2", "group": "9", "start": 3_080_100, "stop": 3_081_000, "info": "info6"},
],
# fmt: on
schema=StructType(
[
StructField("id", StringType(), False),
StructField("group", StringType(), False),
StructField("start", IntegerType(), False),
StructField("stop", IntegerType(), False),
StructField("info", StringType(), False),
]
),
)
found = spark.createDataFrame(
# fmt: off
data=[
{"id": "1", "group": "1", "start": 1_000_000, "stop": 1_001_200},
{"id": "1", "group": "9", "start": 3_080_103, "stop": 3_080_500},
{"id": "1", "group": "9", "start": 3_080_511, "stop": 3_081_000},
{"id": "2", "group": "1", "start": 1_000_005, "stop": 1_001_200},
{"id": "2", "group": "6", "start": 6_000_000, "stop": 6_003_009},
{"id": "2", "group": "6", "start": 6_003_015, "stop": 6_004_000},
{"id": "2", "group": "9", "start": 3_080_100, "stop": 3_080_500},
{"id": "2", "group": "9", "start": 3_080_496, "stop": 3_080_996},
],
# fmt: on
schema=StructType(
[
StructField("id", StringType(), False),
StructField("group", StringType(), False),
StructField("start", IntegerType(), False),
StructField("stop", IntegerType(), False),
]
),
)
如果我使用“id”和“group”对它们进行“leftouter”连接,并从右侧删除连接中使用的重复列,我会得到删除了
null
值的列,如预期的那样:
joined_left_outer_without_range = (
expected.join(
found,
on=[
expected.id == found.id,
expected.group == found.group,
],
how="leftouter",
)
.drop(found.id)
.drop(found.group)
)
print("joined_left_outer_without_range:")
joined_left_outer_without_range.show()
joined_left_outer_without_range:
+---+-----+-------+-------+-----+-------+-------+
| id|group| start| stop| info| start| stop|
+---+-----+-------+-------+-----+-------+-------+
| 1| 1|1000000|1001200|info1|1000000|1001200|
| 1| 6|6001000|6003330|info2| null| null|
| 1| 9|3080100|3081000|info3|3080511|3081000|
| 1| 9|3080100|3081000|info3|3080103|3080500|
| 2| 1|1000000|1001200|info4|1000005|1001200|
| 2| 6|6001000|6003330|info5|6003015|6004000|
| 2| 6|6001000|6003330|info5|6000000|6003009|
| 2| 9|3080100|3081000|info6|3080496|3080996|
| 2| 9|3080100|3081000|info6|3080100|3080500|
+---+-----+-------+-------+-----+-------+-------+
但是,如果我首先使用“内部”范围连接来连接列,然后尝试使用“leftouter”连接将该连接的结果与原始数据帧连接起来,尝试删除来自右侧数据帧的重复列,实际上从左侧数据框中删除列并从右侧数据框中保留
null
值:
joined_range_overlap = (
expected.hint("range_join", 300).join(
found,
on=[
expected.id == found.id,
expected.group == found.group,
expected.start < found.stop,
expected.stop > found.start,
],
how="inner",
)
.drop(found.id)
.drop(found.group)
.withColumn("found_start", found.start)
.withColumn("found_stop", found.stop)
.drop(found.start)
.drop(found.stop)
)
print("joined_range_overlap:")
joined_range_overlap.show()
joined_with_missing_overlap = (
expected.join(
joined_range_overlap,
on=[
expected.id == joined_range_overlap.id,
expected.group == joined_range_overlap.group,
expected.start == joined_range_overlap.start,
expected.stop == joined_range_overlap.stop,
],
how="leftouter",
)
.drop(joined_range_overlap.id)
.drop(joined_range_overlap.group)
.drop(joined_range_overlap.start)
.drop(joined_range_overlap.stop)
)
print("joined_with_missing_overlap:")
joined_with_missing_overlap.show()
joined_range_overlap:
+---+-----+-------+-------+-----+-----------+----------+
| id|group| start| stop| info|found_start|found_stop|
+---+-----+-------+-------+-----+-----------+----------+
| 1| 1|1000000|1001200|info1| 1000000| 1001200|
| 1| 9|3080100|3081000|info3| 3080103| 3080500|
| 1| 9|3080100|3081000|info3| 3080511| 3081000|
| 2| 1|1000000|1001200|info4| 1000005| 1001200|
| 2| 6|6001000|6003330|info5| 6000000| 6003009|
| 2| 6|6001000|6003330|info5| 6003015| 6004000|
| 2| 9|3080100|3081000|info6| 3080100| 3080500|
| 2| 9|3080100|3081000|info6| 3080496| 3080996|
+---+-----+-------+-------+-----+-----------+----------+
joined_with_missing_overlap:
+-----+----+-----+-------+-------+-----+-----------+----------+
| info| id|group| start| stop| info|found_start|found_stop|
+-----+----+-----+-------+-------+-----+-----------+----------+
|info1| 1| 1|1000000|1001200|info1| 1000000| 1001200|
|info2|null| null| null| null| null| null| null|
|info3| 1| 9|3080100|3081000|info3| 3080511| 3081000|
|info3| 1| 9|3080100|3081000|info3| 3080103| 3080500|
|info4| 2| 1|1000000|1001200|info4| 1000005| 1001200|
|info5| 2| 6|6001000|6003330|info5| 6003015| 6004000|
|info5| 2| 6|6001000|6003330|info5| 6000000| 6003009|
|info6| 2| 9|3080100|3081000|info6| 3080496| 3080996|
|info6| 2| 9|3080100|3081000|info6| 3080100| 3080500|
+-----+----+-----+-------+-------+-----+-----------+----------+
为什么这没有按预期工作,如何让 PySpark 删除预期的列而不显式重命名列?
注意:我将这些作为两个单独的连接进行的原因是我尝试使用 Databricks 范围连接优化提示,仅针对“内部”范围重叠连接执行。
我相信发生这种情况是因为在左外连接
expected
和 joined_range_overlap
之后,您有重复的列名,并且 pyspark 不会删除您想要的列名。您可以查看下面的完整表格:
joined_with_missing_overlap = (
expected.join(
joined_range_overlap,
on=[
expected.id == joined_range_overlap.id,
expected.group == joined_range_overlap.group,
expected.start == joined_range_overlap.start,
expected.stop == joined_range_overlap.stop,
],
how="leftouter",
)
)
+---+-----+-------+-------+-----+----+-----+-------+-------+-----+-----------+----------+
| id|group| start| stop| info| id|group| start| stop| info|found_start|found_stop|
+---+-----+-------+-------+-----+----+-----+-------+-------+-----+-----------+----------+
| 1| 1|1000000|1001200|info1| 1| 1|1000000|1001200|info1| 1000000| 1001200|
| 1| 6|6001000|6003330|info2|NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 1| 9|3080100|3081000|info3| 1| 9|3080100|3081000|info3| 3080511| 3081000|
| 1| 9|3080100|3081000|info3| 1| 9|3080100|3081000|info3| 3080103| 3080500|
| 2| 1|1000000|1001200|info4| 2| 1|1000000|1001200|info4| 1000005| 1001200|
| 2| 6|6001000|6003330|info5| 2| 6|6001000|6003330|info5| 6003015| 6004000|
| 2| 6|6001000|6003330|info5| 2| 6|6001000|6003330|info5| 6000000| 6003009|
| 2| 9|3080100|3081000|info6| 2| 9|3080100|3081000|info6| 3080496| 3080996|
| 2| 9|3080100|3081000|info6| 2| 9|3080100|3081000|info6| 3080100| 3080500|
+---+-----+-------+-------+-----+----+-----+-------+-------+-----+-----------+----------+
您可以使用表别名来使其按预期工作:
expected = expected.alias("e")
joined_range_overlap = joined_range_overlap.alias("j")
joined_with_missing_overlap = (
expected.join(
joined_range_overlap,
on=[
expected.id == joined_range_overlap.id,
expected.group == joined_range_overlap.group,
expected.start == joined_range_overlap.start,
expected.stop == joined_range_overlap.stop,
],
how="leftouter",
)
.drop(F.col('j.id'))
.drop(F.col('j.group'))
.drop(F.col('j.start'))
.drop(F.col('j.stop'))
)
joined_with_missing_overlap:
+---+-----+-------+-------+-----+-----+-----------+----------+
| id|group| start| stop| info| info|found_start|found_stop|
+---+-----+-------+-------+-----+-----+-----------+----------+
| 1| 1|1000000|1001200|info1|info1| 1000000| 1001200|
| 1| 6|6001000|6003330|info2| NULL| NULL| NULL|
| 1| 9|3080100|3081000|info3|info3| 3080511| 3081000|
| 1| 9|3080100|3081000|info3|info3| 3080103| 3080500|
| 2| 1|1000000|1001200|info4|info4| 1000005| 1001200|
| 2| 6|6001000|6003330|info5|info5| 6003015| 6004000|
| 2| 6|6001000|6003330|info5|info5| 6000000| 6003009|
| 2| 9|3080100|3081000|info6|info6| 3080496| 3080996|
| 2| 9|3080100|3081000|info6|info6| 3080100| 3080500|
+---+-----+-------+-------+-----+-----+-----------+----------+