MladenB
MladenB

Reputation: 161

Advanced join two dataframe spark scala

I have to join two Dataframes.

Sample: Dataframe1 looks like this

df1_col1      df1_col2
   a            ex1
   b            ex4
   c            ex2
   d            ex6
   e            ex3

Dataframe2

df2_col1      df2_col2
   1           a,b,c
   2           d,c,e
   3           a,e,c

In result Dataframe I would like to get result like this

res_col1      res_col2       res_col3
    a           ex1             1
    a           ex1             3
    b           ex4             1
    c           ex2             1
    c           ex2             2
    c           ex2             3
    d           ex6             2
    e           ex3             2
    e           ex3             3

What will be the best way to achieve this join?

Upvotes: 1

Views: 252

Answers (3)

Mahesh Gupta
Mahesh Gupta

Reputation: 1892

I used spark scala data frame to achieve your desire output.

val df1 = sc.parallelize(Seq(("a","ex1"),("b","ex4"),("c","ex2"),("d","ex6"),("e","ex3"))).toDF("df1_col1","df1_col2") 

val df2 = sc.parallelize(Seq((1,("a,b,c")),(2,("d,c,e")),(3,("a,e,c")))).toDF("df2_col1","df2_col2") 

df2.withColumn("_tmp", explode(split($"df2_col2", "\\,"))).as("temp").join (df1,$"temp._tmp"===df1("df1_col1"),"inner").drop("_tmp","df2_col2").show

Desire Output

+--------+--------+--------+
|df2_col1|df1_col1|df1_col2|
+--------+--------+--------+
|       2|       e|     ex3|
|       3|       e|     ex3|
|       2|       d|     ex6|
|       1|       c|     ex2|
|       2|       c|     ex2|
|       3|       c|     ex2|
|       1|       b|     ex4|
|       1|       a|     ex1|
|       3|       a|     ex1|
+--------+--------+--------+

Rename the Column according to your requirement.

Here the screenshot of running code

enter image description here Happy Hadoooooooooooooooppppppppppppppppppp

Upvotes: 0

MladenB
MladenB

Reputation: 161

I used spark sql for this join, here is a part of code;

df1.createOrReplaceTempView("temp_v_df1")
df2.createOrReplaceTempView("temp_v_df2")
val df_result = spark.sql("""select 
                    |   b.df1_col1 as res_col1, 
                    |   b.df1_col2 as res_col2, 
                    |   a.df2_col1 as res_col3  
                    |   from (select df2_col1, exp_col 
                    |         from temp_v_df2 
                    |        lateral view explode(split(df2_col2,",")) dummy as exp_col) a
                    |   join temp_v_df1 b on a.exp_col = b.df1_col1""".stripMargin)

Upvotes: 1

Sc0rpion
Sc0rpion

Reputation: 73

I have updated the code below

val df1 = sc.parallelize(Seq(("a","ex1"),("b","ex4"),("c","ex2"),("d","ex6"),("e","ex3")))
val df2 = sc.parallelize(Seq(List(("1","a,b,c"),("2","d,c,e")))).toDF
df2.withColumn("df2_col2_explode", explode(split($"_2", ","))).select($"_1".as("df2_col1"),$"df2_col2_explode").join(df1.select($"_1".as("df1_col1"),$"_2".as("df1_col2")), $"df1_col1"===$"df2_col2_explode","inner").show

You just need to split the values and generate multiple rows by exploding it and then join with the other dataframe.

You can refer this link, How to split pipe-separated column into multiple rows?

Upvotes: 1

Related Questions