bismi
bismi

Reputation: 165

Joining two pyspark dataframes where df1 column is ArrayType and df2 is IntegerType

I have two pyspark dataframes df1 with IntegerType Column and df2 with collect_set column.

I want to join both such that for each set of df2 all the rows in df1 should come in the same group.

I have a df as below:

+--------------------------------+---+
|ID                              |grp|
+--------------------------------+---+
|7d693086c5b8f74cbe881166cf3c2a29|2  |
|fcb907411aff4f44c599cf03d23327c0|2  |
|7933546917973caa8c2898c834446415|1  |
|3ef2e38d48a9af3e096ddd3bc3816afb|1  |
|7e18b452bb1e2845800a71d9431033b6|3  |
|9bc9d06e0efb16abde20c35ba36a2f1b|3  |
|7e18b452bb1e2845800a71d9431033b6|4  |
|ff351ada316cbb0f270f935adfd16ad4|4  |
|8919d5fd5b6fd118c1c6b691c65c9df9|6  |
.......
+--------------------------------+---+

Another df2 as below:

+--------------------------------+-------------+
|ID                              |collected_grp|
+--------------------------------+-------------+
|fcb907411aff4f44c599cf03d23327c0|[2]          |
|ff351ada316cbb0f270f935adfd16ad4|[16, 4]      |
|9bc9d06e0efb16abde20c35ba36a2f1b|[16, 3]      |
|7e18b452bb1e2845800a71d9431033b6|[16, 3, 4]   |
|8919d5fd5b6fd118c1c6b691c65c9df9|[6, 7, 8]    |
|484f25e9ab91af2c116cd788c91bdc82|[5]          |
|8dc7dfb4466590375f1aaac7fc8cb987|[6, 8]       |
|8240cf1e442a97aa91d1029270728bbb|[5]          |
|9b93e3cfc5605e74ce2ce4c9450fd622|[7, 8]       |
|41f007c0cc45c228e246f1cc91145878|[9, 13]      |
|8f459a7cff281bad73f604166841849e|[9, 14]      |
|99f70106443a6f3f5c69d99a49d22d01|[10]         |
|f6da014449e6fa82c24d002b4a27b105|[9, 13, 14]  |
|be73ca52536d13dfea295d4fcd273fde|[10]         |
......
+--------------------------------+-------------+

I want to join df2 with df1 such that for arrray like [16,4], [16, 3, 4] all the values of each grp should be in one group.

Any help is appreciated.

below is the code for creating both the dataframes:

data = [
    ['7933546917973caa8c2898c834446415', '3ef2e38d48a9af3e096ddd3bc3816afb', 1],
    ['7d693086c5b8f74cbe881166cf3c2a29', 'fcb907411aff4f44c599cf03d23327c0', 2],
    ['7e18b452bb1e2845800a71d9431033b6', '9bc9d06e0efb16abde20c35ba36a2f1b', 3],
    ['7e18b452bb1e2845800a71d9431033b6', 'ff351ada316cbb0f270f935adfd16ad4', 4],
    ['8240cf1e442a97aa91d1029270728bbb', '484f25e9ab91af2c116cd788c91bdc82', 5],
    ['8919d5fd5b6fd118c1c6b691c65c9df9', '8dc7dfb4466590375f1aaac7fc8cb987', 6],
    ['8919d5fd5b6fd118c1c6b691c65c9df9', '9b93e3cfc5605e74ce2ce4c9450fd622', 7],
    ['8dc7dfb4466590375f1aaac7fc8cb987', '9b93e3cfc5605e74ce2ce4c9450fd622', 8],
    ['8f459a7cff281bad73f604166841849e', '41f007c0cc45c228e246f1cc91145878', 9],
    ['99f70106443a6f3f5c69d99a49d22d01', 'be73ca52536d13dfea295d4fcd273fde', 10],
    ['a9781767ca4fe8fb1282ee003d2c06ac', 'cb6feb2f38731fc7832545cbe2ac881b', 11],
    ['f4901968c29e928fc7364411b03336d4', '6fa82a51f17f0bf258fe06befc661216', 12],
    ['f6da014449e6fa82c24d002b4a27b105', '41f007c0cc45c228e246f1cc91145878', 13],
    ['f6da014449e6fa82c24d002b4a27b105', '8f459a7cff281bad73f604166841849e', 14],
    ['f93c0028bb26bc9b99fca1db300c2ac1', 'ccce888c5813025e95434d7ceedf1db3', 15],
    ['ff351ada316cbb0f270f935adfd16ad4', '9bc9d06e0efb16abde20c35ba36a2f1b', 16],
    ['ffe20a2c61638bb10bf943c42b4d794f', '985e237162ccfc04874664648893c241', 17],
]

df = spark.createDataFrame(data, schema=['id1', 'id2', 'grp'])

df2 = df.alias('df1')\
    .join(df.alias('df2'), (F.col('df1.ID1') == F.col('df2.ID2')), 'left')\
    .select(F.array_distinct(F.array(F.col('df1.ID1'), F.col('df1.ID2'), F.col('df2.ID1'), F.col('df2.ID2'))).alias('ID'), F.col('df1.grp') )
    
df3 = df2.select(explode('ID').alias('ID'), 'grp').dropna()

df3.groupBy('ID').agg(collect_set('grp').alias('collected_grp')).show(40, truncate=False)

My expected output is:

+------------------------------------------------------------------------------------------------------+
|ID                                                                                                    |
+------------------------------------------------------------------------------------------------------|
|[7d693086c5b8f74cbe881166cf3c2a29, fcb907411aff4f44c599cf03d23327c0]                                  |
|[7933546917973caa8c2898c834446415, 3ef2e38d48a9af3e096ddd3bc3816afb]                                  |
|[8240cf1e442a97aa91d1029270728bbb, 484f25e9ab91af2c116cd788c91bdc82]                                  |
|[8dc7dfb4466590375f1aaac7fc8cb987, 9b93e3cfc5605e74ce2ce4c9450fd622, 8919d5fd5b6fd118c1c6b691c65c9df9]|
|[8f459a7cff281bad73f604166841849e, 41f007c0cc45c228e246f1cc91145878, f6da014449e6fa82c24d002b4a27b105]|
|[99f70106443a6f3f5c69d99a49d22d01, be73ca52536d13dfea295d4fcd273fde]                                  |
|[a9781767ca4fe8fb1282ee003d2c06ac, cb6feb2f38731fc7832545cbe2ac881b]                                  |
|[f4901968c29e928fc7364411b03336d4, 6fa82a51f17f0bf258fe06befc661216]                                  |
|[ffe20a2c61638bb10bf943c42b4d794f, 985e237162ccfc04874664648893c241]                                  |
|[ff351ada316cbb0f270f935adfd16ad4, 9bc9d06e0efb16abde20c35ba36a2f1b, 7e18b452bb1e2845800a71d9431033b6]|
|[f93c0028bb26bc9b99fca1db300c2ac1, ccce888c5813025e95434d7ceedf1db3]                                  |
+------------------------------------------------------------------------------------------------------+

Upvotes: 0

Views: 74

Answers (1)

Lingesh.K
Lingesh.K

Reputation: 460

You can try using networkx package along with pandas to get to the result. For the following input data:

+--------------------------------+----------------------+
|ID                              |exploded_collected_grp|
+--------------------------------+----------------------+
|fcb907411aff4f44c599cf03d23327c0|2                     |
|ff351ada316cbb0f270f935adfd16ad4|16                    |
|ff351ada316cbb0f270f935adfd16ad4|4                     |
|9bc9d06e0efb16abde20c35ba36a2f1b|16                    |
|9bc9d06e0efb16abde20c35ba36a2f1b|3                     |
|7e18b452bb1e2845800a71d9431033b6|16                    |
|7e18b452bb1e2845800a71d9431033b6|3                     |
|7e18b452bb1e2845800a71d9431033b6|4                     |
|8919d5fd5b6fd118c1c6b691c65c9df9|6                     |
|8919d5fd5b6fd118c1c6b691c65c9df9|7                     |
|8919d5fd5b6fd118c1c6b691c65c9df9|8                     |
|484f25e9ab91af2c116cd788c91bdc82|5                     |
|8dc7dfb4466590375f1aaac7fc8cb987|6                     |
|8dc7dfb4466590375f1aaac7fc8cb987|8                     |
|8240cf1e442a97aa91d1029270728bbb|5                     |
|9b93e3cfc5605e74ce2ce4c9450fd622|7                     |
|9b93e3cfc5605e74ce2ce4c9450fd622|8                     |
|41f007c0cc45c228e246f1cc91145878|9                     |
|41f007c0cc45c228e246f1cc91145878|13                    |
|8f459a7cff281bad73f604166841849e|9                     |
|8f459a7cff281bad73f604166841849e|14                    |
|99f70106443a6f3f5c69d99a49d22d01|10                    |
|f6da014449e6fa82c24d002b4a27b105|9                     |
|f6da014449e6fa82c24d002b4a27b105|13                    |
|f6da014449e6fa82c24d002b4a27b105|14                    |
|be73ca52536d13dfea295d4fcd273fde|10                    |
+--------------------------------+----------------------+

I have run this logic which will first create a graph and find the connected nodes that can be attached as part of groups:

import pandas as pd
import networkx as nx
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("NetworkX Example")
         .master("local")
         .getOrCreate())

data = {
    "ID": [
        "fcb907411aff4f44c599cf03d23327c0",
        "ff351ada316cbb0f270f935adfd16ad4",
        "ff351ada316cbb0f270f935adfd16ad4",
        "9bc9d06e0efb16abde20c35ba36a2f1b",
        "9bc9d06e0efb16abde20c35ba36a2f1b",
        "7e18b452bb1e2845800a71d9431033b6",
        "7e18b452bb1e2845800a71d9431033b6",
        "7e18b452bb1e2845800a71d9431033b6",
        "8919d5fd5b6fd118c1c6b691c65c9df9",
        "8919d5fd5b6fd118c1c6b691c65c9df9",
        "8919d5fd5b6fd118c1c6b691c65c9df9",
        "484f25e9ab91af2c116cd788c91bdc82",
        "8dc7dfb4466590375f1aaac7fc8cb987",
        "8dc7dfb4466590375f1aaac7fc8cb987",
        "8240cf1e442a97aa91d1029270728bbb",
        "9b93e3cfc5605e74ce2ce4c9450fd622",
        "9b93e3cfc5605e74ce2ce4c9450fd622",
        "41f007c0cc45c228e246f1cc91145878",
        "41f007c0cc45c228e246f1cc91145878",
        "8f459a7cff281bad73f604166841849e",
        "8f459a7cff281bad73f604166841849e",
        "99f70106443a6f3f5c69d99a49d22d01",
        "f6da014449e6fa82c24d002b4a27b105",
        "f6da014449e6fa82c24d002b4a27b105",
        "f6da014449e6fa82c24d002b4a27b105",
        "be73ca52536d13dfea295d4fcd273fde"
    ],
    "exploded_collected_grp": [
        2, 16, 4, 16, 3, 16, 3, 4, 6, 7, 8, 5, 6, 8, 5, 7, 8, 9, 13, 9, 14, 10, 9, 13, 14, 10
    ]
}

df = pd.DataFrame(data)

# Group by 'ID' and convert 'exploded_collected_grp' values to lists
df_grouped = df.groupby('ID')['exploded_collected_grp'].apply(list).reset_index()

# Create a dictionary where each unique list of 'exploded_collected_grp' values is associated with a list of 'ID' values
dict_groups = {}
for index, row in df_grouped.iterrows():
    key = tuple(row['exploded_collected_grp'])
    if key in dict_groups:
        dict_groups[key].append(row['ID'])
    else:
        dict_groups[key] = [row['ID']]

# Convert the dictionary to a dataframe
df_result = pd.DataFrame(list(dict_groups.items()), columns=['GROUPS', 'ID'])

# Create a graph from the dataframe
G = nx.Graph()
for index, row in df.iterrows():
    G.add_edge(row['ID'], row['exploded_collected_grp'])

# Find the connected components of the graph
connected_components = list(nx.connected_components(G))

# For each connected component, find the corresponding groups
result = []
for component in connected_components:
    ids = [node for node in component if isinstance(node, str)]
    groups = [node for node in component if isinstance(node, int)]
    result.append({'ID': ids, 'GROUPS': groups})

# Create a new dataframe from the connected components and their corresponding groups
df_result_nx = pd.DataFrame(result)

# convert the pandas DataFrame to a Spark DataFrame
df_result_nx_spark = spark.createDataFrame(df_result_nx)

df_result_nx_spark.show(df_result_nx_spark.count(), False)

spark.stop()

This gives me the following output:

+------------------------------------------------------------------------------------------------------+-----------+
|ID                                                                                                    |GROUPS     |
+------------------------------------------------------------------------------------------------------+-----------+
|[fcb907411aff4f44c599cf03d23327c0]                                                                    |[2]        |
|[ff351ada316cbb0f270f935adfd16ad4, 7e18b452bb1e2845800a71d9431033b6, 9bc9d06e0efb16abde20c35ba36a2f1b]|[3, 4, 16] |
|[8919d5fd5b6fd118c1c6b691c65c9df9, 8dc7dfb4466590375f1aaac7fc8cb987, 9b93e3cfc5605e74ce2ce4c9450fd622]|[6, 7, 8]  |
|[484f25e9ab91af2c116cd788c91bdc82, 8240cf1e442a97aa91d1029270728bbb]                                  |[5]        |
|[f6da014449e6fa82c24d002b4a27b105, 41f007c0cc45c228e246f1cc91145878, 8f459a7cff281bad73f604166841849e]|[9, 13, 14]|
|[99f70106443a6f3f5c69d99a49d22d01, be73ca52536d13dfea295d4fcd273fde]                                  |[10]       |
+------------------------------------------------------------------------------------------------------+-----------+

Upvotes: 1

Related Questions