连接两个 pyspark 数据帧,其中 df1 列是 ArrayType,df2 是 IntegerType

问题描述 投票:0回答:1

我有两个带有 IntegerType 列的 pyspark 数据框 df1 和带有collect_set 列的 df2。

我想加入两者,以便对于 df2 的每组,df1 中的所有行都应位于同一组中。

我有一个 df 如下:

+--------------------------------+---+
|ID                              |grp|
+--------------------------------+---+
|7d693086c5b8f74cbe881166cf3c2a29|2  |
|fcb907411aff4f44c599cf03d23327c0|2  |
|7933546917973caa8c2898c834446415|1  |
|3ef2e38d48a9af3e096ddd3bc3816afb|1  |
|7e18b452bb1e2845800a71d9431033b6|3  |
|9bc9d06e0efb16abde20c35ba36a2f1b|3  |
|7e18b452bb1e2845800a71d9431033b6|4  |
|ff351ada316cbb0f270f935adfd16ad4|4  |
|8919d5fd5b6fd118c1c6b691c65c9df9|6  |
.......
+--------------------------------+---+

另一个 df2 如下:

+--------------------------------+-------------+
|ID                              |collected_grp|
+--------------------------------+-------------+
|fcb907411aff4f44c599cf03d23327c0|[2]          |
|ff351ada316cbb0f270f935adfd16ad4|[16, 4]      |
|9bc9d06e0efb16abde20c35ba36a2f1b|[16, 3]      |
|7e18b452bb1e2845800a71d9431033b6|[16, 3, 4]   |
|8919d5fd5b6fd118c1c6b691c65c9df9|[6, 7, 8]    |
|484f25e9ab91af2c116cd788c91bdc82|[5]          |
|8dc7dfb4466590375f1aaac7fc8cb987|[6, 8]       |
|8240cf1e442a97aa91d1029270728bbb|[5]          |
|9b93e3cfc5605e74ce2ce4c9450fd622|[7, 8]       |
|41f007c0cc45c228e246f1cc91145878|[9, 13]      |
|8f459a7cff281bad73f604166841849e|[9, 14]      |
|99f70106443a6f3f5c69d99a49d22d01|[10]         |
|f6da014449e6fa82c24d002b4a27b105|[9, 13, 14]  |
|be73ca52536d13dfea295d4fcd273fde|[10]         |
......
+--------------------------------+-------------+

我想将 df2 与 df1 连接起来,这样对于像 [16,4], [16, 3, 4] 这样的数组,每个 grp 的所有值都应该在一组中。

如有任何帮助,我们将不胜感激。

下面是创建两个数据框的代码:

data = [
    ['7933546917973caa8c2898c834446415', '3ef2e38d48a9af3e096ddd3bc3816afb', 1],
    ['7d693086c5b8f74cbe881166cf3c2a29', 'fcb907411aff4f44c599cf03d23327c0', 2],
    ['7e18b452bb1e2845800a71d9431033b6', '9bc9d06e0efb16abde20c35ba36a2f1b', 3],
    ['7e18b452bb1e2845800a71d9431033b6', 'ff351ada316cbb0f270f935adfd16ad4', 4],
    ['8240cf1e442a97aa91d1029270728bbb', '484f25e9ab91af2c116cd788c91bdc82', 5],
    ['8919d5fd5b6fd118c1c6b691c65c9df9', '8dc7dfb4466590375f1aaac7fc8cb987', 6],
    ['8919d5fd5b6fd118c1c6b691c65c9df9', '9b93e3cfc5605e74ce2ce4c9450fd622', 7],
    ['8dc7dfb4466590375f1aaac7fc8cb987', '9b93e3cfc5605e74ce2ce4c9450fd622', 8],
    ['8f459a7cff281bad73f604166841849e', '41f007c0cc45c228e246f1cc91145878', 9],
    ['99f70106443a6f3f5c69d99a49d22d01', 'be73ca52536d13dfea295d4fcd273fde', 10],
    ['a9781767ca4fe8fb1282ee003d2c06ac', 'cb6feb2f38731fc7832545cbe2ac881b', 11],
    ['f4901968c29e928fc7364411b03336d4', '6fa82a51f17f0bf258fe06befc661216', 12],
    ['f6da014449e6fa82c24d002b4a27b105', '41f007c0cc45c228e246f1cc91145878', 13],
    ['f6da014449e6fa82c24d002b4a27b105', '8f459a7cff281bad73f604166841849e', 14],
    ['f93c0028bb26bc9b99fca1db300c2ac1', 'ccce888c5813025e95434d7ceedf1db3', 15],
    ['ff351ada316cbb0f270f935adfd16ad4', '9bc9d06e0efb16abde20c35ba36a2f1b', 16],
    ['ffe20a2c61638bb10bf943c42b4d794f', '985e237162ccfc04874664648893c241', 17],
]

df = spark.createDataFrame(data, schema=['id1', 'id2', 'grp'])

df2 = df.alias('df1')\
    .join(df.alias('df2'), (F.col('df1.ID1') == F.col('df2.ID2')), 'left')\
    .select(F.array_distinct(F.array(F.col('df1.ID1'), F.col('df1.ID2'), F.col('df2.ID1'), F.col('df2.ID2'))).alias('ID'), F.col('df1.grp') )
    
df3 = df2.select(explode('ID').alias('ID'), 'grp').dropna()

df3.groupBy('ID').agg(collect_set('grp').alias('collected_grp')).show(40, truncate=False)

我的预期输出是:

+------------------------------------------------------------------------------------------------------+
|ID                                                                                                    |
+------------------------------------------------------------------------------------------------------|
|[7d693086c5b8f74cbe881166cf3c2a29, fcb907411aff4f44c599cf03d23327c0]                                  |
|[7933546917973caa8c2898c834446415, 3ef2e38d48a9af3e096ddd3bc3816afb]                                  |
|[8240cf1e442a97aa91d1029270728bbb, 484f25e9ab91af2c116cd788c91bdc82]                                  |
|[8dc7dfb4466590375f1aaac7fc8cb987, 9b93e3cfc5605e74ce2ce4c9450fd622, 8919d5fd5b6fd118c1c6b691c65c9df9]|
|[8f459a7cff281bad73f604166841849e, 41f007c0cc45c228e246f1cc91145878, f6da014449e6fa82c24d002b4a27b105]|
|[99f70106443a6f3f5c69d99a49d22d01, be73ca52536d13dfea295d4fcd273fde]                                  |
|[a9781767ca4fe8fb1282ee003d2c06ac, cb6feb2f38731fc7832545cbe2ac881b]                                  |
|[f4901968c29e928fc7364411b03336d4, 6fa82a51f17f0bf258fe06befc661216]                                  |
|[ffe20a2c61638bb10bf943c42b4d794f, 985e237162ccfc04874664648893c241]                                  |
|[ff351ada316cbb0f270f935adfd16ad4, 9bc9d06e0efb16abde20c35ba36a2f1b, 7e18b452bb1e2845800a71d9431033b6]|
|[f93c0028bb26bc9b99fca1db300c2ac1, ccce888c5813025e95434d7ceedf1db3]                                  |
+------------------------------------------------------------------------------------------------------+
dataframe apache-spark pyspark databricks python-dedupe
1个回答
0
投票

您可以尝试使用

networkx
包和
pandas
来获得结果。对于以下输入数据:

+--------------------------------+----------------------+
|ID                              |exploded_collected_grp|
+--------------------------------+----------------------+
|fcb907411aff4f44c599cf03d23327c0|2                     |
|ff351ada316cbb0f270f935adfd16ad4|16                    |
|ff351ada316cbb0f270f935adfd16ad4|4                     |
|9bc9d06e0efb16abde20c35ba36a2f1b|16                    |
|9bc9d06e0efb16abde20c35ba36a2f1b|3                     |
|7e18b452bb1e2845800a71d9431033b6|16                    |
|7e18b452bb1e2845800a71d9431033b6|3                     |
|7e18b452bb1e2845800a71d9431033b6|4                     |
|8919d5fd5b6fd118c1c6b691c65c9df9|6                     |
|8919d5fd5b6fd118c1c6b691c65c9df9|7                     |
|8919d5fd5b6fd118c1c6b691c65c9df9|8                     |
|484f25e9ab91af2c116cd788c91bdc82|5                     |
|8dc7dfb4466590375f1aaac7fc8cb987|6                     |
|8dc7dfb4466590375f1aaac7fc8cb987|8                     |
|8240cf1e442a97aa91d1029270728bbb|5                     |
|9b93e3cfc5605e74ce2ce4c9450fd622|7                     |
|9b93e3cfc5605e74ce2ce4c9450fd622|8                     |
|41f007c0cc45c228e246f1cc91145878|9                     |
|41f007c0cc45c228e246f1cc91145878|13                    |
|8f459a7cff281bad73f604166841849e|9                     |
|8f459a7cff281bad73f604166841849e|14                    |
|99f70106443a6f3f5c69d99a49d22d01|10                    |
|f6da014449e6fa82c24d002b4a27b105|9                     |
|f6da014449e6fa82c24d002b4a27b105|13                    |
|f6da014449e6fa82c24d002b4a27b105|14                    |
|be73ca52536d13dfea295d4fcd273fde|10                    |
+--------------------------------+----------------------+

我已经运行了这个逻辑,它将首先创建一个图形并找到可以作为组的一部分附加的连接节点:

import pandas as pd
import networkx as nx
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("NetworkX Example")
         .master("local")
         .getOrCreate())

data = {
    "ID": [
        "fcb907411aff4f44c599cf03d23327c0",
        "ff351ada316cbb0f270f935adfd16ad4",
        "ff351ada316cbb0f270f935adfd16ad4",
        "9bc9d06e0efb16abde20c35ba36a2f1b",
        "9bc9d06e0efb16abde20c35ba36a2f1b",
        "7e18b452bb1e2845800a71d9431033b6",
        "7e18b452bb1e2845800a71d9431033b6",
        "7e18b452bb1e2845800a71d9431033b6",
        "8919d5fd5b6fd118c1c6b691c65c9df9",
        "8919d5fd5b6fd118c1c6b691c65c9df9",
        "8919d5fd5b6fd118c1c6b691c65c9df9",
        "484f25e9ab91af2c116cd788c91bdc82",
        "8dc7dfb4466590375f1aaac7fc8cb987",
        "8dc7dfb4466590375f1aaac7fc8cb987",
        "8240cf1e442a97aa91d1029270728bbb",
        "9b93e3cfc5605e74ce2ce4c9450fd622",
        "9b93e3cfc5605e74ce2ce4c9450fd622",
        "41f007c0cc45c228e246f1cc91145878",
        "41f007c0cc45c228e246f1cc91145878",
        "8f459a7cff281bad73f604166841849e",
        "8f459a7cff281bad73f604166841849e",
        "99f70106443a6f3f5c69d99a49d22d01",
        "f6da014449e6fa82c24d002b4a27b105",
        "f6da014449e6fa82c24d002b4a27b105",
        "f6da014449e6fa82c24d002b4a27b105",
        "be73ca52536d13dfea295d4fcd273fde"
    ],
    "exploded_collected_grp": [
        2, 16, 4, 16, 3, 16, 3, 4, 6, 7, 8, 5, 6, 8, 5, 7, 8, 9, 13, 9, 14, 10, 9, 13, 14, 10
    ]
}

df = pd.DataFrame(data)

# Group by 'ID' and convert 'exploded_collected_grp' values to lists
df_grouped = df.groupby('ID')['exploded_collected_grp'].apply(list).reset_index()

# Create a dictionary where each unique list of 'exploded_collected_grp' values is associated with a list of 'ID' values
dict_groups = {}
for index, row in df_grouped.iterrows():
    key = tuple(row['exploded_collected_grp'])
    if key in dict_groups:
        dict_groups[key].append(row['ID'])
    else:
        dict_groups[key] = [row['ID']]

# Convert the dictionary to a dataframe
df_result = pd.DataFrame(list(dict_groups.items()), columns=['GROUPS', 'ID'])

# Create a graph from the dataframe
G = nx.Graph()
for index, row in df.iterrows():
    G.add_edge(row['ID'], row['exploded_collected_grp'])

# Find the connected components of the graph
connected_components = list(nx.connected_components(G))

# For each connected component, find the corresponding groups
result = []
for component in connected_components:
    ids = [node for node in component if isinstance(node, str)]
    groups = [node for node in component if isinstance(node, int)]
    result.append({'ID': ids, 'GROUPS': groups})

# Create a new dataframe from the connected components and their corresponding groups
df_result_nx = pd.DataFrame(result)

# convert the pandas DataFrame to a Spark DataFrame
df_result_nx_spark = spark.createDataFrame(df_result_nx)

df_result_nx_spark.show(df_result_nx_spark.count(), False)

spark.stop()

这给了我以下输出:

+------------------------------------------------------------------------------------------------------+-----------+
|ID                                                                                                    |GROUPS     |
+------------------------------------------------------------------------------------------------------+-----------+
|[fcb907411aff4f44c599cf03d23327c0]                                                                    |[2]        |
|[ff351ada316cbb0f270f935adfd16ad4, 7e18b452bb1e2845800a71d9431033b6, 9bc9d06e0efb16abde20c35ba36a2f1b]|[3, 4, 16] |
|[8919d5fd5b6fd118c1c6b691c65c9df9, 8dc7dfb4466590375f1aaac7fc8cb987, 9b93e3cfc5605e74ce2ce4c9450fd622]|[6, 7, 8]  |
|[484f25e9ab91af2c116cd788c91bdc82, 8240cf1e442a97aa91d1029270728bbb]                                  |[5]        |
|[f6da014449e6fa82c24d002b4a27b105, 41f007c0cc45c228e246f1cc91145878, 8f459a7cff281bad73f604166841849e]|[9, 13, 14]|
|[99f70106443a6f3f5c69d99a49d22d01, be73ca52536d13dfea295d4fcd273fde]                                  |[10]       |
+------------------------------------------------------------------------------------------------------+-----------+
© www.soinside.com 2019 - 2024. All rights reserved.