为什么我无法让 PySpark 在“leftouter”与本身就是联接结果的 Dataframe 联接之后删除右侧的重复列?

问题描述 投票:0回答:1

我有以下输入数据框:

expected = spark.createDataFrame(
    # fmt: off
    data=[
        {"id": "1", "group": "1", "start": 1_000_000, "stop": 1_001_200, "info": "info1"},
        {"id": "1", "group": "6", "start": 6_001_000, "stop": 6_003_330, "info": "info2"},
        {"id": "1", "group": "9", "start": 3_080_100, "stop": 3_081_000, "info": "info3"},
        {"id": "2", "group": "1", "start": 1_000_000, "stop": 1_001_200, "info": "info4"},
        {"id": "2", "group": "6", "start": 6_001_000, "stop": 6_003_330, "info": "info5"},
        {"id": "2", "group": "9", "start": 3_080_100, "stop": 3_081_000, "info": "info6"},
    ],
    # fmt: on
    schema=StructType(
        [
            StructField("id", StringType(), False),
            StructField("group", StringType(), False),
            StructField("start", IntegerType(), False),
            StructField("stop", IntegerType(), False),
            StructField("info", StringType(), False),
        ]
    ),
)

found = spark.createDataFrame(
    # fmt: off
    data=[
        {"id": "1", "group": "1", "start": 1_000_000, "stop": 1_001_200},
        {"id": "1", "group": "9", "start": 3_080_103, "stop": 3_080_500},
        {"id": "1", "group": "9", "start": 3_080_511, "stop": 3_081_000},
        {"id": "2", "group": "1", "start": 1_000_005, "stop": 1_001_200},
        {"id": "2", "group": "6", "start": 6_000_000, "stop": 6_003_009},
        {"id": "2", "group": "6", "start": 6_003_015, "stop": 6_004_000},
        {"id": "2", "group": "9", "start": 3_080_100, "stop": 3_080_500},
        {"id": "2", "group": "9", "start": 3_080_496, "stop": 3_080_996},
    ],
    # fmt: on
    schema=StructType(
        [
            StructField("id", StringType(), False),
            StructField("group", StringType(), False),
            StructField("start", IntegerType(), False),
            StructField("stop", IntegerType(), False),
        ]
    ),
)

如果我使用“id”和“group”对它们进行“leftouter”连接,并从右侧删除连接中使用的重复列,我会得到删除了

null
值的列,如预期的那样:

joined_left_outer_without_range = (
    expected.join(
        found,
        on=[
            expected.id == found.id,
            expected.group == found.group,
        ],
        how="leftouter",
    )
    .drop(found.id)
    .drop(found.group)
)

print("joined_left_outer_without_range:")
joined_left_outer_without_range.show()
joined_left_outer_without_range:
+---+-----+-------+-------+-----+-------+-------+
| id|group|  start|   stop| info|  start|   stop|
+---+-----+-------+-------+-----+-------+-------+
|  1|    1|1000000|1001200|info1|1000000|1001200|
|  1|    6|6001000|6003330|info2|   null|   null|
|  1|    9|3080100|3081000|info3|3080511|3081000|
|  1|    9|3080100|3081000|info3|3080103|3080500|
|  2|    1|1000000|1001200|info4|1000005|1001200|
|  2|    6|6001000|6003330|info5|6003015|6004000|
|  2|    6|6001000|6003330|info5|6000000|6003009|
|  2|    9|3080100|3081000|info6|3080496|3080996|
|  2|    9|3080100|3081000|info6|3080100|3080500|
+---+-----+-------+-------+-----+-------+-------+

但是,如果我首先使用“内部”范围连接来连接列,然后尝试使用“leftouter”连接将该连接的结果与原始数据帧连接起来,尝试删除来自右侧数据帧的重复列,实际上从左侧数据框中删除列并从右侧数据框中保留

null
值:

joined_range_overlap = (
    expected.hint("range_join", 300).join(
        found,
        on=[
            expected.id == found.id,
            expected.group == found.group,
            expected.start < found.stop,
            expected.stop > found.start,
        ],
        how="inner",
    )
    .drop(found.id)
    .drop(found.group)
    .withColumn("found_start", found.start)
    .withColumn("found_stop", found.stop)
    .drop(found.start)
    .drop(found.stop)
)

print("joined_range_overlap:")
joined_range_overlap.show()

joined_with_missing_overlap = (
    expected.join(
        joined_range_overlap,
        on=[
            expected.id == joined_range_overlap.id,
            expected.group == joined_range_overlap.group,
            expected.start == joined_range_overlap.start,
            expected.stop == joined_range_overlap.stop,
        ],
        how="leftouter",
    )
    .drop(joined_range_overlap.id)
    .drop(joined_range_overlap.group)
    .drop(joined_range_overlap.start)
    .drop(joined_range_overlap.stop)
)

print("joined_with_missing_overlap:")
joined_with_missing_overlap.show()
joined_range_overlap:
+---+-----+-------+-------+-----+-----------+----------+
| id|group|  start|   stop| info|found_start|found_stop|
+---+-----+-------+-------+-----+-----------+----------+
|  1|    1|1000000|1001200|info1|    1000000|   1001200|
|  1|    9|3080100|3081000|info3|    3080103|   3080500|
|  1|    9|3080100|3081000|info3|    3080511|   3081000|
|  2|    1|1000000|1001200|info4|    1000005|   1001200|
|  2|    6|6001000|6003330|info5|    6000000|   6003009|
|  2|    6|6001000|6003330|info5|    6003015|   6004000|
|  2|    9|3080100|3081000|info6|    3080100|   3080500|
|  2|    9|3080100|3081000|info6|    3080496|   3080996|
+---+-----+-------+-------+-----+-----------+----------+

joined_with_missing_overlap:
+-----+----+-----+-------+-------+-----+-----------+----------+
| info|  id|group|  start|   stop| info|found_start|found_stop|
+-----+----+-----+-------+-------+-----+-----------+----------+
|info1|   1|    1|1000000|1001200|info1|    1000000|   1001200|
|info2|null| null|   null|   null| null|       null|      null|
|info3|   1|    9|3080100|3081000|info3|    3080511|   3081000|
|info3|   1|    9|3080100|3081000|info3|    3080103|   3080500|
|info4|   2|    1|1000000|1001200|info4|    1000005|   1001200|
|info5|   2|    6|6001000|6003330|info5|    6003015|   6004000|
|info5|   2|    6|6001000|6003330|info5|    6000000|   6003009|
|info6|   2|    9|3080100|3081000|info6|    3080496|   3080996|
|info6|   2|    9|3080100|3081000|info6|    3080100|   3080500|
+-----+----+-----+-------+-------+-----+-----------+----------+

为什么这没有按预期工作,如何让 PySpark 删除预期的列而不显式重命名列?

注意:我将这些作为两个单独的连接进行的原因是我尝试使用 Databricks 范围连接优化提示,仅针对“内部”范围重叠连接执行

python apache-spark pyspark
1个回答
0
投票

我相信发生这种情况是因为在左外连接

expected
joined_range_overlap
之后,您有重复的列名,并且 pyspark 不会删除您想要的列名。您可以查看下面的完整表格:

joined_with_missing_overlap = (
    expected.join(
        joined_range_overlap,
        on=[
            expected.id == joined_range_overlap.id,
            expected.group == joined_range_overlap.group,
            expected.start == joined_range_overlap.start,
            expected.stop == joined_range_overlap.stop,
        ],
        how="leftouter",
    )
)

+---+-----+-------+-------+-----+----+-----+-------+-------+-----+-----------+----------+
| id|group|  start|   stop| info|  id|group|  start|   stop| info|found_start|found_stop|
+---+-----+-------+-------+-----+----+-----+-------+-------+-----+-----------+----------+
|  1|    1|1000000|1001200|info1|   1|    1|1000000|1001200|info1|    1000000|   1001200|
|  1|    6|6001000|6003330|info2|NULL| NULL|   NULL|   NULL| NULL|       NULL|      NULL|
|  1|    9|3080100|3081000|info3|   1|    9|3080100|3081000|info3|    3080511|   3081000|
|  1|    9|3080100|3081000|info3|   1|    9|3080100|3081000|info3|    3080103|   3080500|
|  2|    1|1000000|1001200|info4|   2|    1|1000000|1001200|info4|    1000005|   1001200|
|  2|    6|6001000|6003330|info5|   2|    6|6001000|6003330|info5|    6003015|   6004000|
|  2|    6|6001000|6003330|info5|   2|    6|6001000|6003330|info5|    6000000|   6003009|
|  2|    9|3080100|3081000|info6|   2|    9|3080100|3081000|info6|    3080496|   3080996|
|  2|    9|3080100|3081000|info6|   2|    9|3080100|3081000|info6|    3080100|   3080500|
+---+-----+-------+-------+-----+----+-----+-------+-------+-----+-----------+----------+

您可以使用表别名来使其按预期工作:

expected = expected.alias("e")
joined_range_overlap = joined_range_overlap.alias("j")

joined_with_missing_overlap = (
    expected.join(
        joined_range_overlap,
        on=[
            expected.id == joined_range_overlap.id,
            expected.group == joined_range_overlap.group,
            expected.start == joined_range_overlap.start,
            expected.stop == joined_range_overlap.stop,
        ],
        how="leftouter",
    )
    .drop(F.col('j.id'))
    .drop(F.col('j.group'))
    .drop(F.col('j.start'))
    .drop(F.col('j.stop'))
)

joined_with_missing_overlap:
+---+-----+-------+-------+-----+-----+-----------+----------+
| id|group|  start|   stop| info| info|found_start|found_stop|
+---+-----+-------+-------+-----+-----+-----------+----------+
|  1|    1|1000000|1001200|info1|info1|    1000000|   1001200|
|  1|    6|6001000|6003330|info2| NULL|       NULL|      NULL|
|  1|    9|3080100|3081000|info3|info3|    3080511|   3081000|
|  1|    9|3080100|3081000|info3|info3|    3080103|   3080500|
|  2|    1|1000000|1001200|info4|info4|    1000005|   1001200|
|  2|    6|6001000|6003330|info5|info5|    6003015|   6004000|
|  2|    6|6001000|6003330|info5|info5|    6000000|   6003009|
|  2|    9|3080100|3081000|info6|info6|    3080496|   3080996|
|  2|    9|3080100|3081000|info6|info6|    3080100|   3080500|
+---+-----+-------+-------+-----+-----+-----------+----------+
© www.soinside.com 2019 - 2024. All rights reserved.