Divas Nikhra
Divas Nikhra

Reputation: 91

How to use a JSON mapping file to generate a new DataFrame in Spark using Scala

I have two DataFrames, DF1 and DF2, and a JSON file which I need to use as a mapping file to create another dataframe (DF3).

DF1:

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|    100|   John| Mumbai|
|    101|   Alex|  Delhi|
|    104|  Divas|Kolkata|
|    108|  Jerry|Chennai|
+-------+-------+-------+

DF2:

+-------+-----------+-------+
|column4|    column5|column6|
+-------+-----------+-------+
|     S1|        New|    xxx|
|     S2|        Old|    yyy|
|     S5|replacement|    zzz|
|    S10|        New|    ppp|
+-------+-----------+-------+

Apart from this one mapping file I am having in JSON format which will be use to generate DF3.

Below is the JSON mapping file:

{"targetColumn":"newColumn1","sourceField1":"column2","sourceField2":"column4"}
{"targetColumn":"newColumn2","sourceField1":"column7","sourceField2":"column5"}
{"targetColumn":"newColumn3","sourceField1":"column8","sourceField2":"column6"}

So from this JSON file I need to create DF3 with a column available in the targetColumn section of the mapping and it will check the source column if it is present in DF1 then it map to sourceField1 from DF1 otherwise sourceField2 from DF2.

Below is the expected output.

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Divas|replacement|       zzz|
|     Jerry|        New|       ppp|
+----------+-----------+----------+

Any help here will be appropriated.

Upvotes: 0

Views: 262

Answers (1)

Manoj Kumar Dhakad
Manoj Kumar Dhakad

Reputation: 1892

Parse the JSON and create the below List of custom objects

case class SrcTgtMapping(targetColumn:String,sourceField1:String,sourceField2:String)
val srcTgtMappingList=List(SrcTgtMapping("newColumn1","column2","column4"),SrcTgtMapping("newColumn2","column7","column5"),SrcTgtMapping("newColumn3","column8","column6"))

Add dummy index column to both the dataframes and join both the dataframes based on index column

import org.apache.spark.sql.functions._

val df1WithIndex=df1.withColumn("index",monotonicallyIncreasingId)
val df2WithIndex=df2.withColumn("index",monotonicallyIncreasingId)
val joinedDf=df1WithIndex.join(df2WithIndex,df1WithIndex.col("index")===df2WithIndex.col("index"))

Create the query and execute it.

val df1Columns=df1WithIndex.columns.toList
val df2Columns=df2WithIndex.columns.toList
val query=srcTgtMappingList.map(stm=>if(df1Columns.contains(stm.sourceField1)) joinedDf.col(stm.sourceField1).alias(stm.targetColumn) else joinedDf.col(stm.sourceField2).alias(stm.targetColumn))
val output=joinedDf.select(query:_*)
output.show

Sample Output:

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Jerry|        New|       ppp|
|     Divas|replacement|       zzz|
+----------+-----------+----------+

Hope this approach will help you

Upvotes: 1

Related Questions