Murali
Murali

Reputation: 65

Merge two dataframes in PySpark

I have two dataframes, DF1 and DF2, DF1 is the master which stores any additional information from DF2.

Lets say the DF1 is of the following format,

Item Id | item      | count
---------------------------
1       | item 1    | 2
2       | item 2    | 3
1       | item 3    | 2
3       | item 4    | 5

DF2 contains the 2 items which were already present in DF1 and two new entries. (itemId and item are considered as a single group, can be treated as the key for join)

Item Id | item      | count
---------------------------
1       | item 1    | 2
3       | item 4    | 2
4       | item 4    | 4
5       | item 5    | 2

I need to combine the two dataframes such that the existing items count are incremented and new items are inserted.

The result should be like:

Item Id | item      | count
---------------------------
1       | item 1    | 4
2       | item 2    | 3
1       | item 3    | 2
3       | item 4    | 7
4       | item 4    | 4
5       | item 5    | 2

I have one way do achieve this not sure if its efficient or the right way to do

temp1 = df1.join(temp,['item_id','item'],'full_outer') \
    .na.fill(0)

temp1\
    .groupby("item_id", "item")\
    .agg(F.sum(temp1["count"] + temp1["newcount"]))\
    .show()

Upvotes: 5

Views: 50976

Answers (3)

H Roy
H Roy

Reputation: 635

@wandermonk's solution is recommended as it does not use join. Avoid joins as much as possible as this triggers shuffling (also known as wide transformation and leads to data transfer over the network and that is expensive and slow)

You also have to look into your data size (both tables are big or one small one big etc) and accordingly you can tune the performance side of it.

I tried showing the group by a solution using SparkSQL as they do the same thing but easier to understand and manipulate.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]

my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)

df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")

df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)

now if you look into the SparkUI, you can see for such a small data set, the shuffle operation, and # of stages.

Number of stages for such a small job enter image description here

Number the shuffle operation for this group by command enter image description here

I also recommend to see the SQL plan and understand the cost. Exchange represents the shuffle here.

== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
   +- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
      +- Union
         :- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
         +- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]

Upvotes: 0

wandermonk
wandermonk

Reputation: 7386

Since, the schema for the two dataframes is the same you can perform a union and then do a groupby id and aggregate the counts.

step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));

Upvotes: 10

Alex Skorokhod
Alex Skorokhod

Reputation: 540

There are several ways how to do it.

Based on what you describe the most straightforward solution would be to use RDD - SparkContext.union:

rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)

union_rdd = sc.union([rdd1, rdd2])

the alternative solution would be to use DataFrame.union from pyspark.sql

Note: I have suggested unionAll previously but it is deprecated in Spark 2.0

Upvotes: 0

Related Questions