Reputation: 107
In spark dataframe, I have 1 column that contain list of list as rows. I want to merge the list of strings into one.
INPUT DATAFRAME:
+-------+--------------------+
| name |friends |
+-------+--------------------+
| Jim |[["C","A"]["B","C"]]|
+-------+--------------------+
| Bill |[["E","A"]["F","L"]]|
+-------+--------------------+
| Kim |[["C","K"]["L","G"]]|
+-------+--------------------+
OUTPUT DATAFRAME:
+-------+--------------------+
| name |friends |
+-------+--------------------+
| Jim |["C","A","B"] |
+-------+--------------------+
| Bill |["E","A","F","L"] |
+-------+--------------------+
| Kim |["C","K","L","G"] |
+-------+--------------------+
I want to merge the list of list into single list and remove the duplicates as well. Thanks in advance
Upvotes: 4
Views: 7660
Reputation: 9701
I think you can rely on a combination of explode
to deconstruct the lists and collect_set
to rebuild them:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
columns = ['name', 'friends']
data = [("Jim", [["C","A"], ["B","C"]]), ("Bill", [["E","A"], ["F","L"]]), ("Kim", [["C","K"], ["L","G"]])]
pd_data = pd.DataFrame.from_records(data=data, columns=columns)
spark_data = spark.createDataFrame(pd_data)
first_explode = spark_data.withColumn("first_explode", F.explode((F.col("friends"))))
first_explode.show()
+----+----------------+-------------+
|name| friends|first_explode|
+----+----------------+-------------+
| Jim|[[C, A], [B, C]]| [C, A]|
| Jim|[[C, A], [B, C]]| [B, C]|
|Bill|[[E, A], [F, L]]| [E, A]|
|Bill|[[E, A], [F, L]]| [F, L]|
| Kim|[[C, K], [L, G]]| [C, K]|
| Kim|[[C, K], [L, G]]| [L, G]|
+----+----------------+-------------+
First level deconstructed. Now for the second one:
second_explode = first_explode.withColumn("second_explode", F.explode(F.col("first_explode")))
second_explode.show()
+----+----------------+-------------+--------------+
|name| friends|first_explode|second_explode|
+----+----------------+-------------+--------------+
| Jim|[[C, A], [B, C]]| [C, A]| C|
| Jim|[[C, A], [B, C]]| [C, A]| A|
| Jim|[[C, A], [B, C]]| [B, C]| B|
| Jim|[[C, A], [B, C]]| [B, C]| C|
|Bill|[[E, A], [F, L]]| [E, A]| E|
|Bill|[[E, A], [F, L]]| [E, A]| A|
|Bill|[[E, A], [F, L]]| [F, L]| F|
|Bill|[[E, A], [F, L]]| [F, L]| L|
| Kim|[[C, K], [L, G]]| [C, K]| C|
| Kim|[[C, K], [L, G]]| [C, K]| K|
| Kim|[[C, K], [L, G]]| [L, G]| L|
| Kim|[[C, K], [L, G]]| [L, G]| G|
+----+----------------+-------------+--------------+
Reconstruct the list, discarding duplicates:
grouped = second_explode.groupBy("name").agg(F.collect_set(F.col("second_explode")).alias("friends"))
grouped.show()
+----+------------+
|name| friends|
+----+------------+
| Jim| [C, B, A]|
|Bill|[F, E, A, L]|
| Kim|[K, C, G, L]|
+----+------------+
Upvotes: 6