Reputation: 65
I have two dataframes, DF1 and DF2, DF1 is the master which stores any additional information from DF2.
Lets say the DF1 is of the following format,
Item Id | item | count
---------------------------
1 | item 1 | 2
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 5
DF2 contains the 2 items which were already present in DF1 and two new entries. (itemId and item are considered as a single group, can be treated as the key for join)
Item Id | item | count
---------------------------
1 | item 1 | 2
3 | item 4 | 2
4 | item 4 | 4
5 | item 5 | 2
I need to combine the two dataframes such that the existing items count are incremented and new items are inserted.
The result should be like:
Item Id | item | count
---------------------------
1 | item 1 | 4
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 7
4 | item 4 | 4
5 | item 5 | 2
I have one way do achieve this not sure if its efficient or the right way to do
temp1 = df1.join(temp,['item_id','item'],'full_outer') \
.na.fill(0)
temp1\
.groupby("item_id", "item")\
.agg(F.sum(temp1["count"] + temp1["newcount"]))\
.show()
Upvotes: 5
Views: 50976
Reputation: 635
@wandermonk's solution is recommended as it does not use join. Avoid joins as much as possible as this triggers shuffling (also known as wide transformation and leads to data transfer over the network and that is expensive and slow)
You also have to look into your data size (both tables are big or one small one big etc) and accordingly you can tune the performance side of it.
I tried showing the group by a solution using SparkSQL as they do the same thing but easier to understand and manipulate.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]
my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)
df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")
df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)
now if you look into the SparkUI, you can see for such a small data set, the shuffle operation, and # of stages.
Number of stages for such a small job
Number the shuffle operation for this group by command
I also recommend to see the SQL plan and understand the cost. Exchange represents the shuffle here.
== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
+- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
+- Union
:- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
+- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]
Upvotes: 0
Reputation: 7386
Since, the schema for the two dataframes is the same you can perform a union
and then do a groupby
id and aggregate
the counts.
step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));
Upvotes: 10
Reputation: 540
There are several ways how to do it.
Based on what you describe the most straightforward solution would be to use RDD - SparkContext.union
:
rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)
union_rdd = sc.union([rdd1, rdd2])
the alternative solution would be to use DataFrame.union
from pyspark.sql
Note: I have suggested unionAll
previously but it is deprecated in Spark 2.0
Upvotes: 0