Reputation: 3
I have two dataframes: The data is too large to use pandas, so I need to do this in Pyspark.
df1:
col 1 | col 2 | col 3|
12345 | asd | zxc |
12345 | qwe | dfg |
12345 | ert | fgh |
df2:
col 1 | col 2 | col 3|
54321 | asd | poi |
54321 | qwe | lkj |
54321 | ert | mnb |
54321 | ytuyeye | jfg|
I want to concatenate these dataframes with the same columns so the width of the table doesn't expand and I'm only taking data in df2 that has a matching col 2. In the above example, I would expect the result to have 6 rows.
In other words, df1 is a snapshot and df2 is a snapshot at a later date where I only want to join in records that are present in df1. I've tried join and union and haven't had any luck. Thanks in advance
I got the answer:
df2_x = df2.alias('b').join(df1.alias('a'), ['col2'], "inner")\
.select("*")
df3 = df1.union(df2_matches)
Upvotes: 0
Views: 451
Reputation: 75150
Imports:
from pyspark.sql import functions as F, Window as W, types as T
Method 1, using join and union:
out1 = df1.unionByName(df1.alias("df1")
.join(df2.alias("df2"),on='col 2',how='inner').select("df2.*"))
Method 2 using union and then a window to count partitioned over col 2 and only keep if count>1
w = W.partitionBy("col 2")
out2 = df1.unionByName(df2).withColumn("Counts",F.count("col 2").over(w))\
.filter("Counts>1").drop("Counts")
Lets look at the Physical plans:
out1.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
:- Scan ExistingRDD[col 1#173L,col 2#174,col 3#175]
+- Project [col 1#192L, col 2#193, col 3#194]
+- SortMergeJoin [col 2#426], [col 2#193], Inner
:- Sort [col 2#426 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col 2#426, 200), ENSURE_REQUIREMENTS, [id=#1557]
: +- Project [col 2#426]
: +- Filter isnotnull(col 2#426)
: +- Scan ExistingRDD[col 1#425L,col 2#426,col 3#427]
+- Sort [col 2#193 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col 2#193, 200), ENSURE_REQUIREMENTS, [id=#1558]
+- Filter isnotnull(col 2#193)
+- Scan ExistingRDD[col 1#192L,col 2#193,col 3#194]
out2.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [col 1#173L, col 2#174, col 3#175]
+- Filter (Counts#451L > 1)
+- Window [count(col 2#174) windowspecdefinition(col 2#174, col 2#174 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS Counts#451L], [col 2#174], [col 2#174 ASC NULLS FIRST]
+- Sort [col 2#174 ASC NULLS FIRST, col 2#174 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col 2#174, 200), ENSURE_REQUIREMENTS, [id=#1588]
+- Union
:- Scan ExistingRDD[col 1#173L,col 2#174,col 3#175]
+- Scan ExistingRDD[col 1#192L,col 2#193,col 3#194]
Upvotes: 1