Adhi cloud
Adhi cloud

Reputation: 49

Pyspark how to join common columns values to a list value

i am trying to join columns values to a list of values


df1=
name | department| state | id|
-----+-----------+-------+---+
James|Sales      |NY     |101
Maria|Finance    |CA     |102
Jen  |Marketing  |NY     |103



df2=
name | department| state | id|
-----+-----------+-------+---+
James|  Sales1   |null   |101
Maria|  Finance  |       |102
Jen  |           |NY2    |103

id_matchd_df=df1.join(df2,df1.id==df2.id,"inner")
id_matchd_df.show()
#result
+--------+----------+-----+----+-----+-----------+-------+---+
|name    |department|state|id  |name | department| state | id|
+--------+----------+-----+----+-----+-----------+-------+---+
|James   |Sales     |NY   |101 |James|  Sales1   |null   |101|  
|Maria   |Finance   |CA   |102 |Maria|  Finance  |       |102|
|Jen     |Marketing |NY   |103 |Jen  |           |NY2    |103|

But i need to join values of same columns with comma separated as look like below

+-----------------------------------------------------------+
|name            | department          | state      | id         | 
['James','James']|['Sales','Sales']    |['NY',null] |['101','101']
['Maria','Maria']|['Finance','Finance']|['CA','']   |['102','102']
['Jen','Jen']    |['Marketing','']     |['NY','NY2']|['102','103']

    

Is there any solution to this ?

Thanks

Upvotes: 1

Views: 1410

Answers (1)

Algamest
Algamest

Reputation: 1529

TL;DR

import pyspark.sql.functions as F

# column to join on. This could become something like:
# join_col = df1.columns[0]
join_col = 'id'

joinDF = df1.join(df2, join_col, "inner")

cols = joinDF.columns
array_cols = set()

for col in cols:
    # store a reference to the new, temporary, `_array` column which we will create
    array_cols.add(f"{col}_array")
    # append the new `_array` column 
    #   using the `F.array` spark function to join the values together
    joinDF = joinDF.withColumn(col + '_array', F.array(df1[col], df2[col]))
    
# select only the joined (by `F.array`) columns and rename them back to the originals
joinDF \
    .select([F.col(ac).alias(ac.replace("_array","")) for ac in array_cols])

Full solution

Edit: to answer the follow up detail:

..i may need to iterate through columns list that we collect from different data set

and

[the] name of columns are unpredictable

union and join have been discussed in the comments...

Your original idea of joining is the right one as union would result in 6 rows as per your example, whereas you want to maintain 3 rows as part the original dataframes.

Starting with your data:

from pyspark.sql import Row, SparkSession

spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([
    Row(name='James', department='Sales', state='NY', id=101),
    Row(name='Maria', department='Finance', state='CA', id=102),
    Row(name='Jen', department='Marketing', state='NY', id=103),
])

df2 = spark.createDataFrame([
    Row(name='James', department='Sales1', state='null', id=101),
    Row(name='Maria', department='Finance', state='', id=102),
    Row(name='Jen', department='', state='NY2', id=103),
])

df1.show()
df2.show()

gives:

+-----+----------+-----+---+
| name|department|state| id|
+-----+----------+-----+---+
|James|     Sales|   NY|101|
|Maria|   Finance|   CA|102|
|  Jen| Marketing|   NY|103|
+-----+----------+-----+---+

+-----+----------+-----+---+
| name|department|state| id|
+-----+----------+-----+---+
|James|    Sales1| null|101|
|Maria|   Finance|     |102|
|  Jen|          |  NY2|103|
+-----+----------+-----+---+

We can then join the dataframes and use the array function to combine the values of the columns, for a given row, into a single column for each.

import pyspark.sql.functions as F

# column to join on. This could become something like:
# join_col = df1.columns[0]
join_col = 'id'

joinDF = df1.join(df2, join_col, "inner")

cols = joinDF.columns
array_cols = set()

for col in cols:
    # store a reference to the new, temporary, `_array` column which we will create
    array_cols.add(f"{col}_array")
    # append the new `_array` column 
    #   using the `F.array` spark function to join the values together
    joinDF = joinDF.withColumn(col + '_array', F.array(df1[col], df2[col]))
    
# select only the joined (by `F.array`) columns and rename them back to the originals
joinDF \
    .select([F.col(ac).alias(ac.replace("_array","")) for ac in array_cols]) \
    .show()

gives:

+------------------+----------+--------------+----------+
|        department|     state|          name|        id|
+------------------+----------+--------------+----------+
|   [Sales, Sales1]|[NY, null]|[James, James]|[101, 101]|
|[Finance, Finance]|    [CA, ]|[Maria, Maria]|[102, 102]|
|     [Marketing, ]| [NY, NY2]|    [Jen, Jen]|[103, 103]|
+------------------+----------+--------------+----------+

Upvotes: 1

Related Questions