goldpanda1019
goldpanda1019

Reputation: 3

Pyspark - concatenate two dataframes based on one field

I have two dataframes: The data is too large to use pandas, so I need to do this in Pyspark.

df1:

 col 1 | col 2 | col 3|
 12345 | asd   | zxc  |
 12345 | qwe   | dfg  |
 12345 | ert   | fgh  |

df2:

 col 1 | col 2 | col 3|
 54321 | asd   | poi  |
 54321 | qwe   | lkj  |
 54321 | ert   | mnb  |
 54321 | ytuyeye | jfg|

I want to concatenate these dataframes with the same columns so the width of the table doesn't expand and I'm only taking data in df2 that has a matching col 2. In the above example, I would expect the result to have 6 rows.

In other words, df1 is a snapshot and df2 is a snapshot at a later date where I only want to join in records that are present in df1. I've tried join and union and haven't had any luck. Thanks in advance

I got the answer:

df2_x = df2.alias('b').join(df1.alias('a'), ['col2'], "inner")\
                                             .select("*")
df3 = df1.union(df2_matches)

Upvotes: 0

Views: 451

Answers (1)

anky
anky

Reputation: 75150

Imports:

from pyspark.sql import functions as F, Window as W, types as T

Method 1, using join and union:

out1 = df1.unionByName(df1.alias("df1")
          .join(df2.alias("df2"),on='col 2',how='inner').select("df2.*"))

Method 2 using union and then a window to count partitioned over col 2 and only keep if count>1

w = W.partitionBy("col 2")
out2 = df1.unionByName(df2).withColumn("Counts",F.count("col 2").over(w))\
.filter("Counts>1").drop("Counts")

Lets look at the Physical plans:


out1.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Scan ExistingRDD[col 1#173L,col 2#174,col 3#175]
   +- Project [col 1#192L, col 2#193, col 3#194]
      +- SortMergeJoin [col 2#426], [col 2#193], Inner
         :- Sort [col 2#426 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(col 2#426, 200), ENSURE_REQUIREMENTS, [id=#1557]
         :     +- Project [col 2#426]
         :        +- Filter isnotnull(col 2#426)
         :           +- Scan ExistingRDD[col 1#425L,col 2#426,col 3#427]
         +- Sort [col 2#193 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col 2#193, 200), ENSURE_REQUIREMENTS, [id=#1558]
               +- Filter isnotnull(col 2#193)
                  +- Scan ExistingRDD[col 1#192L,col 2#193,col 3#194]

out2.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [col 1#173L, col 2#174, col 3#175]
   +- Filter (Counts#451L > 1)
      +- Window [count(col 2#174) windowspecdefinition(col 2#174, col 2#174 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS Counts#451L], [col 2#174], [col 2#174 ASC NULLS FIRST]
         +- Sort [col 2#174 ASC NULLS FIRST, col 2#174 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col 2#174, 200), ENSURE_REQUIREMENTS, [id=#1588]
               +- Union
                  :- Scan ExistingRDD[col 1#173L,col 2#174,col 3#175]
                  +- Scan ExistingRDD[col 1#192L,col 2#193,col 3#194]

Upvotes: 1

Related Questions