Reputation: 1
I have below two data sets.
code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore
id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US
Can we join code1, code2 and code3 with first dataset and get name for each column?
id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States
The first column join is working, however second column is failing.
Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
code1.show();
The code below is failing:
Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
code2.show();
Upvotes: 0
Views: 2949
Reputation: 2468
You could use a udf
assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.
code_df
is your country_code dataframe while data_df
is your data.
import org.apache.spark.sql.functions._
val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")
for ((k,v) <- mapcode) {
printf("key: %s, value: %s\n", k, v)
}
def getCode( code: String ) : String = {
val desc = mapcode(code).getAs[String](1)
return desc
}
val getcode_udf = udf(getCode _)
val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
.withColumn("code2desc", getcode_udf($"code2"))
.withColumn("code3desc", getcode_udf($"code3"))
println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)
Following is the result :
Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc |code2desc |code3desc |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
|2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
Upvotes: 0
Reputation: 7207
For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:
// data
val countries = List(
("IN", "India"),
("US", "United States"),
("UK", "United Kingdom"),
("SG", "Singapore")
).toDF("code", "name")
val people = List(
(1, "abc", "UK", "SG", "US"),
(2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")
// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
people.alias("p")
.join(countries.withColumnRenamed("name", column + "desc").alias("c"),
col("p." + column) === $"c.code",
"left_outer")
.drop(column, "code")
)
Result is:
+---+----+--------------+--------------+-------------+
|id |name|code1desc |code2desc |code3desc |
+---+----+--------------+--------------+-------------+
|1 |abc |United Kingdom|Singapore |United States|
|2 |efg |Singapore |United Kingdom|United States|
+---+----+--------------+--------------+-------------+
Note: if "countries" dataframe is small, broadcast join can be used for better performance.
Upvotes: 1