Pratik Rudra
Pratik Rudra

Reputation: 47

How to merge dataframes keeping order in spark or Python

I am trying to merge two dataframes but keeping the order.

First dataframe has value:

>>> df_branch1.show(10,False)
+------------------------+
|col                     |
+------------------------+
|Sorter_SAMPLE_CUSTOMER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

Second dataframe has value:

>>> df_branch2.show(10,False)
+------------------------+                                                      
|col                     |
+------------------------+
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

I want to merge the dataframe but keep the order and expect the order to be preserved.

Output expect like:

+------------------------+                                                      
|col                     |
+------------------------+
|Sorter_SAMPLE_CUSTOMER  |
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

Any solution through pyspark or python should do

Upvotes: 1

Views: 1398

Answers (2)

Ged
Ged

Reputation: 18003

This solution uses zipWithIndex, not convinced on mono... approach. Have another solution but as pushed for time here it is.

from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType, LongType
import pyspark.sql.functions as F

df1 = spark.createDataFrame([('abc'),('2'),('3'),('4')], StringType())
df2 = spark.createDataFrame([('abc'),('2a'),('3'),('4')], StringType())

# Common schema, can make def but pushed for time otherwise
schema = StructType(df1.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = df1.rdd.zipWithIndex()
rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df1 = spark.createDataFrame(rdd1, schema)
df1 = df1.withColumn("t", F.lit(1))
rdd = df2.rdd.zipWithIndex()
rdd2 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df2 = spark.createDataFrame(rdd2, schema)
df2 = df2.withColumn("t", F.lit(2))

# df1 has all values always to be presented is the assumption, it's about getting the extras from df2 and positioned directly after
# functional solution, may be performance an issue, could do woth collect_list etc. but using SQL here
# Did not consider if less values for T1 vs # of values for T2

df1.createOrReplaceTempView("data1")
df2.createOrReplaceTempView("data2")

df3 = spark.sql('''select * from data2 d2  
                where exists   
                 (select d1.value from data1 d1
                   where d1.index = d2.index
                     and d1.value <> d2.value)
               ''')

dfRES = df1.union(df3).orderBy("index", "t").drop(*['index', 't'])
dfRES.show(truncate=False)

returns with ordering preserved in final DF and no distinct required:

+-----+
|value|
+-----+
|abc  |
|2    |
|2a   |
|3    |
|4    |
+-----+

UPD

Although the question is vague, this solution caters for repeating values - if the exist, e.g.:

df1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4') ], StringType())
df2 = spark.createDataFrame([ ('abc'),('2a'),('3'),('4'), ('abc'),('2b'),('3'),('4'), ('abc'),('2c'),('3c'),('4')   ], StringType()) 

Upvotes: 1

YOLO
YOLO

Reputation: 21709

Here's a way to do using a key column:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# create a key column
d1 = d1.withColumn("key", F.monotonically_increasing_id())
d2 = d2.withColumn("key", F.monotonically_increasing_id())

# concat data
d3 = d1.union(d2)

# sort by key
d3 = d3.orderBy('key').drop('key')

w = Window().partitionBy("col1").orderBy('col1')
d4 = d3.withColumn("key", F.monotonically_increasing_id())
d4 = (d4
     .withColumn("dupe", F.row_number().over(w))
     .where("dupe == 1")
     .orderBy("key")
     .drop(*['key', 'dupe']))

d4.show()

+------------------------+
|col1                    |
+------------------------+
|Sorter_SAMPLE_CUSTOMER  |
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

Upvotes: 1

Related Questions