Vishal choudhary
Vishal choudhary

Reputation: 315

Merge two columns of different DataFrames in Spark using scala

I want to merge two columns from separate DataFrames in one DataFrames

I have two DataFrames like this

val ds1 = sc.parallelize(Seq(1,0,1,0)).toDF("Col1")
val ds2 = sc.parallelize(Seq(234,43,341,42)).toDF("Col2")
ds1.show()

+-----+
| Col1|
+-----+
|    0|
|    1|
|    0|
|    1|
+-----+

ds2.show()
+-----+
| Col2|
+-----+
|  234|
|   43|
|  341|
|   42|
+-----+

I want 3rd dataframe containing two columns Col1 and Col2

+-----++-----+
| Col1|| Col2|
+-----++-----+
|    0||  234|
|    1||   43|
|    0||  341|
|    1||   42|
+-----++-----+

I tried union

val ds3 = ds1.union(ds2)

But, it adds all row of ds2 to ds1.

Upvotes: 0

Views: 315

Answers (1)

SanBan
SanBan

Reputation: 655

monotonically_increasing_id <-- is not Deterministic.

Hence it is not guaranteed that you would get correct result

Easier to do by using RDD and creating key by using zipWithIndex

val ds1 = sc.parallelize(Seq(1,0,1,0)).toDF("Col1")
val ds2 = sc.parallelize(Seq(234,43,341,42)).toDF("Col2")

// Convert to RDD with ZIPINDEX < Which will be our key

val ds1Rdd = ds1.rdd.repartition(4).zipWithIndex().map{ case (v,k) => (k,v) }

val ds2Rdd = ds2.as[(Int)].rdd.repartition(4).zipWithIndex().map{ case (v,k) => (k,v) }

// Check How The KEY-VALUE Pair looks

ds1Rdd.collect()

res50: Array[(Long, Int)] = Array((0,0), (1,1), (2,1), (3,0))

res51: Array[(Long, Int)] = Array((0,341), (1,42), (2,43), (3,234))

So First element of the tuple is our Join key

we simply join and rearrange to result dataframe

val joinedRdd = ds1Rdd.join(ds2Rdd)

val resultrdd = joinedRdd.map(x => x._2).map(x => (x._1 ,x._2))

// resultrdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[204] at map at <console>

And we convert to DataFrame

 resultrdd.toDF("Col1","Col2").show()
+----+----+
|Col1|Col2|
+----+----+
|   0| 341|
|   1|  42|
|   1|  43|
|   0| 234|
+----+----+

Upvotes: 1

Related Questions