Reputation: 35
Given two DataFramemes.
DF1 is :
|emp_id|emp_site |emp_name
|1 |Washigton | [Will, Smith]
|2 |null | null
|3 |New York | [Norman, Smith]
|4 |Iowa | [Ian, Smith]
DF2 is :
|emp_id|emp_site |emp_name
|1 |Washigton | [Watson, Smith]
|2 |Wisconsin | [Sam, Robinson]
|3 |New York | null
|4 |Illinois | [Ican, Robinson]
|5 |Pennsylvania | [Patrick, Robinson]
Expected is a DF3 which is a join of DF1 and DF2 on the empty_id column(outer join such that all DF2 records are available). DF3 to have all the columns(except for emp_id) represented in the form of 'from', 'to' and 'change' ONLY IF the respective column values from DF1 and DF2 differ(NOTE: if column values from DF1 and DF2 are equal then should be null ).
note: While 'from' and 'to' are self explanatory containing values from DF1 and DF2, 'change' indicates 'insert' if the 'from' value is null. otherwise indicates 'update'
|emp_id|emp_site |emp_name
|1 |null |[from -> [Will, Smith], to -> [Watson, Smith], change->update]
|2 |[to->Wisconsin, change->insert] |[to -> [Sam, Robinson], change->update]
|3 |null |[from -> [Norman, Smith], change->update]
|4 |[from ->Iowa, to -> Illinois, change->insert] |[from ->[Ian, Smith], to -> [Ican, Robinson], change-> insert]
|5 |[to -> Pennsylvania, change->insert] | [to -> [Patrick, Robinson], change->insert]
Had no success trying to achieve this desired DF3 using Map (esp. because the column types are not always Strings i.e., 'from' and 'to' can hold struct types depending on column types in df1 and df2). reason for trying with map data structure to build 'from', 'to' and 'change' in DF3 is because eventually this DF3 needs to be translated to a json.
Any help is much appreciated.
Upvotes: 0
Views: 331
Reputation: 6338
Try this-
val data =
"""
|emp_id|emp_site |emp_name
|1 |Washigton | Will
|2 |null | null
|3 |New York | Norman
|4 |Iowa | Ian
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df1.show(false)
df1.printSchema()
/**
* +------+---------+--------+
* |emp_id|emp_site |emp_name|
* +------+---------+--------+
* |1 |Washigton|Will |
* |2 |null |null |
* |3 |New York |Norman |
* |4 |Iowa |Ian |
* +------+---------+--------+
*
* root
* |-- emp_id: integer (nullable = true)
* |-- emp_site: string (nullable = true)
* |-- emp_name: string (nullable = true)
*/
val data1 =
"""
|emp_id|emp_site |emp_name
|1 |Washigton | Watson
|2 |Wisconsin | Sam
|3 |New York | null
|4 |Illinois | Ican
|5 |Pennsylvania | Patrick
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df2.show(false)
df2.printSchema()
/**
* +------+------------+--------+
* |emp_id|emp_site |emp_name|
* +------+------------+--------+
* |1 |Washigton |Watson |
* |2 |Wisconsin |Sam |
* |3 |New York |null |
* |4 |Illinois |Ican |
* |5 |Pennsylvania|Patrick |
* +------+------------+--------+
*
* root
* |-- emp_id: integer (nullable = true)
* |-- emp_site: string (nullable = true)
* |-- emp_name: string (nullable = true)
*/
val joiningKey = "emp_id"
val cols =
df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
.map { c =>
val (df1Col, df2Col) = df1.col(c._1) -> df2.col(c._2)
when(df1Col.isNull && df2Col.isNotNull,
array(map(lit("to"), df2Col), map(lit("change"), lit("insert"))))
.when(df1Col.isNotNull && df2Col.isNull,
array(map(lit("from"), df1Col), map(lit("change"), lit("delete"))))
.when(df1Col.isNotNull && df2Col.isNotNull && df1Col === df2Col,
lit(null))
.when(df1Col.isNull && df2Col.isNull,
lit(null))
.otherwise(array(map(lit("from"), df1Col), map(lit("to"), df2Col), map(lit("change"), lit("update"))))
.as(c._1)
}
df1.join(df2, Seq(joiningKey), "outer")
.select(cols ++ Seq(col(colName = joiningKey)): _*)
.orderBy(joiningKey)
.show(false)
/**
* +------------------------------------------------------+----------------------------------------------------+------+
* |emp_site |emp_name |emp_id|
* +------------------------------------------------------+----------------------------------------------------+------+
* |null |[[from -> Will], [to -> Watson], [change -> update]]|1 |
* |[[to -> Wisconsin], [change -> insert]] |[[to -> Sam], [change -> insert]] |2 |
* |null |[[from -> Norman], [change -> delete]] |3 |
* |[[from -> Iowa], [to -> Illinois], [change -> update]]|[[from -> Ian], [to -> Ican], [change -> update]] |4 |
* |[[to -> Pennsylvania], [change -> insert]] |[[to -> Patrick], [change -> insert]] |5 |
* +------------------------------------------------------+----------------------------------------------------+------+
*/
// in case column is not of type string
val getExpr = (fromExpr: String, toExpr: String, changeExpr: String) =>
s"named_struct('from', $fromExpr, 'to', $toExpr, 'change', '$changeExpr')"
val cols1 =
df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
.map { c =>
val (c1, c2) = s"df1.${c._1}" -> s"df2.${c._2}"
when(expr(s"$c1 is null and $c2 is not null"), expr(getExpr("null", c2, "insert")))
.when(expr(s"$c1 is not null and $c2 is null"), expr(getExpr(c1, "null", "delete")))
.when(expr(s"$c1 is not null and $c2 is not null and $c1=$c2"), expr(getExpr("null", "null", "null")))
.when(expr(s"$c1 is null and $c2 is null"), expr(getExpr("null", "null", "null")))
.otherwise(expr(getExpr(c1, c2, "update")))
.as(c._1)
}
val processedDF = df1.as("df1").join(df2.as("df2"), Seq(joiningKey), "outer")
.select(cols1 ++ Seq(col(colName = joiningKey)): _*)
.orderBy(joiningKey)
processedDF.show(false)
processedDF.printSchema()
/**
* +------------------------+----------------------+------+
* |emp_site |emp_name |emp_id|
* +------------------------+----------------------+------+
* |[,, null] |[Will, Watson, update]|1 |
* |[, Wisconsin, insert] |[, Sam, insert] |2 |
* |[,, null] |[Norman,, delete] |3 |
* |[Iowa, Illinois, update]|[Ian, Ican, update] |4 |
* |[, Pennsylvania, insert]|[, Patrick, insert] |5 |
* +------------------------+----------------------+------+
*
* root
* |-- emp_site: struct (nullable = false)
* | |-- from: string (nullable = true)
* | |-- to: string (nullable = true)
* | |-- change: string (nullable = false)
* |-- emp_name: struct (nullable = false)
* | |-- from: string (nullable = true)
* | |-- to: string (nullable = true)
* | |-- change: string (nullable = false)
* |-- emp_id: integer (nullable = true)
*/
Please note if the
from
andto
are not present, I'm using change asnull
, you can change that to something else likeno-op
Upvotes: 0