Reputation: 157
I am new to spark and scala and would like to know how to perform operations between 2 dataframes. In my case I have these two dataframes:
DF1:
ID_EMPLOYEE sup_id_1 desc_1 sup_id_2 desc_2 ... sup_id_18 desc_18 sup_id_19 desc_19
AAAAAAAA SUP_ID1 SUP_ID2 ... SUP_ID3 SUP_ID4
BBBBBBBBB SUP_ID4 SUP_ID6 ... SUP_ID6 SUP_ID6
CCCCCCCCC SUP_ID5 SUP_ID5 ... SUP_ID5 SUP_ID5
DDDDDDDD SUP_ID7 SUP_ID7 ... SUP_ID7 SUP_ID7
and
DF2:
Key Desc
SUP_ID1 Desc1
SUP_ID2 Desc2
SUP_ID3 Desc3
SUP_ID4 Desc4
SUP_ID5 Desc5
SUP_ID6 Desc6
SUP_ID7 Desc7
I would like to modify from DF1 the desc_*
columns based on DF2, because in DF1 they are empty. The way to fill it would be looking at the sup_id_*
columns of DF1 and the Key
column of DF2, adding in the desc_*
column of DF1 the value of the Desc
column of DF2.
I don't know what would be the easiest way to do it, since with my knowledge I can only think of treating the DataFrames as SQL tables and make as many joins as I have desc_*
columns, but that is not the most efficient way.
Upvotes: 0
Views: 182
Reputation: 1214
import spark.implicits._
import org.apache.spark.sql.functions.{col}
case class Source1(
idEmploye: String,
sup_id_1: String,
desc_1: Option[String],
sup_id_2: String,
desc_2: Option[String],
sup_id_3: String,
desc_3: Option[String],
sup_id_4: String,
desc_4: Option[String],
sup_id_5: String,
desc_5: Option[String],
sup_id_6: String,
desc_6: Option[String]
)
val source1 = Seq(
Source1("AAAAAAAA", "SUP_ID1", None, "SUP_ID2", None, "SUP_ID3", None, "SUP_ID4", None, "SUP_ID5", None, "SUP_ID8", None),
Source1("BBBBBBBBB", "SUP_ID4", None, "SUP_ID6", None, "SUP_ID6", None, "SUP_ID6", None, "SUP_ID6", None, "SUP_ID8", None),
Source1("CCCCCCCCC", "SUP_ID5", None, "SUP_ID5", None, "SUP_ID5", None, "SUP_ID5", None, "SUP_ID5", None, "SUP_ID8", None),
Source1("DDDDDDDD", "SUP_ID7", None, "SUP_ID7", None, "SUP_ID7", None, "SUP_ID7", None, "SUP_ID7", None, "SUP_ID8", None)
).toDF()
val source2 = Seq(
("SUP_ID1", "Desc1"),
("SUP_ID2", "Desc2"),
("SUP_ID3", "Desc3"),
("SUP_ID4", "Desc4"),
("SUP_ID5", "Desc5"),
("SUP_ID6", "Desc6"),
("SUP_ID7", "Desc7")
).toDF("Key", "Desc")
val listColumns = 1 to ((source1.columns.length - 1) / 2) by 1
val source12 = listColumns.foldLeft(source1){(memoDF, colName) =>
val df1 = memoDF.join(source2,
memoDF.col(s"sup_id_$colName") === source2.col("key"),
"left_outer")
df1.drop("key", s"desc_${colName}")
.withColumnRenamed("Desc", s"desc_$colName")
}
val resDF = source12.select(source1.columns.map(name => col(name)):_*)
resDF.printSchema
// root
// |-- idEmploye: string (nullable = true)
// |-- sup_id_1: string (nullable = true)
// |-- desc_1: string (nullable = true)
// |-- sup_id_2: string (nullable = true)
// |-- desc_2: string (nullable = true)
// |-- sup_id_3: string (nullable = true)
// |-- desc_3: string (nullable = true)
// |-- sup_id_4: string (nullable = true)
// |-- desc_4: string (nullable = true)
// |-- sup_id_5: string (nullable = true)
// |-- desc_5: string (nullable = true)
// |-- sup_id_6: string (nullable = true)
// |-- desc_6: string (nullable = true)
resDF.show(false)
// +---------+--------+------+--------+------+--------+------+--------+------+--------+------+--------+------+
// |idEmploye|sup_id_1|desc_1|sup_id_2|desc_2|sup_id_3|desc_3|sup_id_4|desc_4|sup_id_5|desc_5|sup_id_6|desc_6|
// +---------+--------+------+--------+------+--------+------+--------+------+--------+------+--------+------+
// |AAAAAAAA |SUP_ID1 |Desc1 |SUP_ID2 |Desc2 |SUP_ID3 |Desc3 |SUP_ID4 |Desc4 |SUP_ID5 |Desc5 |SUP_ID8 |null |
// |BBBBBBBBB|SUP_ID4 |Desc4 |SUP_ID6 |Desc6 |SUP_ID6 |Desc6 |SUP_ID6 |Desc6 |SUP_ID6 |Desc6 |SUP_ID8 |null |
// |CCCCCCCCC|SUP_ID5 |Desc5 |SUP_ID5 |Desc5 |SUP_ID5 |Desc5 |SUP_ID5 |Desc5 |SUP_ID5 |Desc5 |SUP_ID8 |null |
// |DDDDDDDD |SUP_ID7 |Desc7 |SUP_ID7 |Desc7 |SUP_ID7 |Desc7 |SUP_ID7 |Desc7 |SUP_ID7 |Desc7 |SUP_ID8 |null |
// +---------+--------+------+--------+------+--------+------+--------+------+--------+------+--------+------+
Upvotes: 2