Reputation: 47
I am trying to merge two dataframes but keeping the order.
First dataframe has value:
>>> df_branch1.show(10,False)
+------------------------+
|col |
+------------------------+
|Sorter_SAMPLE_CUSTOMER |
|Join_Source_Target |
|Exp_DetectChanges |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT |
|Seq_Unique_Key |
+------------------------+
Second dataframe has value:
>>> df_branch2.show(10,False)
+------------------------+
|col |
+------------------------+
|Sorter_CUSTOMER_MASTER |
|Join_Source_Target |
|Exp_DetectChanges |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT |
|Seq_Unique_Key |
+------------------------+
I want to merge the dataframe but keep the order and expect the order to be preserved.
Output expect like:
+------------------------+
|col |
+------------------------+
|Sorter_SAMPLE_CUSTOMER |
|Sorter_CUSTOMER_MASTER |
|Join_Source_Target |
|Exp_DetectChanges |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT |
|Seq_Unique_Key |
+------------------------+
Any solution through pyspark or python should do
Upvotes: 1
Views: 1398
Reputation: 18003
This solution uses zipWithIndex, not convinced on mono... approach. Have another solution but as pushed for time here it is.
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType, LongType
import pyspark.sql.functions as F
df1 = spark.createDataFrame([('abc'),('2'),('3'),('4')], StringType())
df2 = spark.createDataFrame([('abc'),('2a'),('3'),('4')], StringType())
# Common schema, can make def but pushed for time otherwise
schema = StructType(df1.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = df1.rdd.zipWithIndex()
rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df1 = spark.createDataFrame(rdd1, schema)
df1 = df1.withColumn("t", F.lit(1))
rdd = df2.rdd.zipWithIndex()
rdd2 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df2 = spark.createDataFrame(rdd2, schema)
df2 = df2.withColumn("t", F.lit(2))
# df1 has all values always to be presented is the assumption, it's about getting the extras from df2 and positioned directly after
# functional solution, may be performance an issue, could do woth collect_list etc. but using SQL here
# Did not consider if less values for T1 vs # of values for T2
df1.createOrReplaceTempView("data1")
df2.createOrReplaceTempView("data2")
df3 = spark.sql('''select * from data2 d2
where exists
(select d1.value from data1 d1
where d1.index = d2.index
and d1.value <> d2.value)
''')
dfRES = df1.union(df3).orderBy("index", "t").drop(*['index', 't'])
dfRES.show(truncate=False)
returns with ordering preserved in final DF and no distinct required:
+-----+
|value|
+-----+
|abc |
|2 |
|2a |
|3 |
|4 |
+-----+
UPD
Although the question is vague, this solution caters for repeating values - if the exist, e.g.:
df1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4') ], StringType())
df2 = spark.createDataFrame([ ('abc'),('2a'),('3'),('4'), ('abc'),('2b'),('3'),('4'), ('abc'),('2c'),('3c'),('4') ], StringType())
Upvotes: 1
Reputation: 21709
Here's a way to do using a key
column:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# create a key column
d1 = d1.withColumn("key", F.monotonically_increasing_id())
d2 = d2.withColumn("key", F.monotonically_increasing_id())
# concat data
d3 = d1.union(d2)
# sort by key
d3 = d3.orderBy('key').drop('key')
w = Window().partitionBy("col1").orderBy('col1')
d4 = d3.withColumn("key", F.monotonically_increasing_id())
d4 = (d4
.withColumn("dupe", F.row_number().over(w))
.where("dupe == 1")
.orderBy("key")
.drop(*['key', 'dupe']))
d4.show()
+------------------------+
|col1 |
+------------------------+
|Sorter_SAMPLE_CUSTOMER |
|Sorter_CUSTOMER_MASTER |
|Join_Source_Target |
|Exp_DetectChanges |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT |
|Seq_Unique_Key |
+------------------------+
Upvotes: 1