Reputation: 91
I have two DataFrame
s, DF1
and DF2
, and a JSON file which I need to use as a mapping file to create another dataframe (DF3
).
DF1:
+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
| 100| John| Mumbai|
| 101| Alex| Delhi|
| 104| Divas|Kolkata|
| 108| Jerry|Chennai|
+-------+-------+-------+
DF2:
+-------+-----------+-------+
|column4| column5|column6|
+-------+-----------+-------+
| S1| New| xxx|
| S2| Old| yyy|
| S5|replacement| zzz|
| S10| New| ppp|
+-------+-----------+-------+
Apart from this one mapping file I am having in JSON format which will be use to generate DF3.
Below is the JSON mapping file:
{"targetColumn":"newColumn1","sourceField1":"column2","sourceField2":"column4"}
{"targetColumn":"newColumn2","sourceField1":"column7","sourceField2":"column5"}
{"targetColumn":"newColumn3","sourceField1":"column8","sourceField2":"column6"}
So from this JSON file I need to create DF3
with a column available in the targetColumn
section of the mapping and it will check the source column if it is present in DF1 then it map to sourceField1
from DF1
otherwise sourceField2
from DF2
.
Below is the expected output.
+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
| John| New| xxx|
| Alex| Old| yyy|
| Divas|replacement| zzz|
| Jerry| New| ppp|
+----------+-----------+----------+
Any help here will be appropriated.
Upvotes: 0
Views: 262
Reputation: 1892
Parse the JSON
and create the below List
of custom objects
case class SrcTgtMapping(targetColumn:String,sourceField1:String,sourceField2:String)
val srcTgtMappingList=List(SrcTgtMapping("newColumn1","column2","column4"),SrcTgtMapping("newColumn2","column7","column5"),SrcTgtMapping("newColumn3","column8","column6"))
Add dummy index column
to both the dataframes
and join both the dataframes
based on index column
import org.apache.spark.sql.functions._
val df1WithIndex=df1.withColumn("index",monotonicallyIncreasingId)
val df2WithIndex=df2.withColumn("index",monotonicallyIncreasingId)
val joinedDf=df1WithIndex.join(df2WithIndex,df1WithIndex.col("index")===df2WithIndex.col("index"))
Create the query
and execute it.
val df1Columns=df1WithIndex.columns.toList
val df2Columns=df2WithIndex.columns.toList
val query=srcTgtMappingList.map(stm=>if(df1Columns.contains(stm.sourceField1)) joinedDf.col(stm.sourceField1).alias(stm.targetColumn) else joinedDf.col(stm.sourceField2).alias(stm.targetColumn))
val output=joinedDf.select(query:_*)
output.show
Sample Output:
+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
| John| New| xxx|
| Alex| Old| yyy|
| Jerry| New| ppp|
| Divas|replacement| zzz|
+----------+-----------+----------+
Hope this approach will help you
Upvotes: 1