stacksr
stacksr

Reputation: 1

Spark Join same data set multiple times on different columns

I have below two data sets.


code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore 

id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US

Can we join code1, code2 and code3 with first dataset and get name for each column?


id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States

The first column join is working, however second column is failing.

Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
    code1.show();

The code below is failing:

Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
    code2.show();

Upvotes: 0

Views: 2949

Answers (2)

user238607
user238607

Reputation: 2468

You could use a udf assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.

code_df is your country_code dataframe while data_df is your data.

import org.apache.spark.sql.functions._

val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")

for ((k,v) <- mapcode) {
  printf("key: %s, value: %s\n", k, v)
}


def getCode( code: String ) : String = {
  val desc = mapcode(code).getAs[String](1)
  return desc
}

val getcode_udf = udf(getCode _)

val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
  .withColumn("code2desc", getcode_udf($"code2"))
  .withColumn("code3desc", getcode_udf($"code3"))

println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)

Following is the result :

Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc     |code2desc     |code3desc    |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1  |abc |UK   |SG   |US   |United Kingdom|Singapore     |United States|
|2  |efg |SG   |UK   |US   |Singapore     |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+

Upvotes: 0

pasha701
pasha701

Reputation: 7207

For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:

// data 
val countries = List(
  ("IN", "India"),
  ("US", "United States"),
  ("UK", "United Kingdom"),
  ("SG", "Singapore")
).toDF("code", "name")

val people = List(
  (1, "abc", "UK", "SG", "US"),
  (2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")

// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
  people.alias("p")
    .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
      col("p." + column) === $"c.code",
      "left_outer")
    .drop(column, "code")
)

Result is:

+---+----+--------------+--------------+-------------+
|id |name|code1desc     |code2desc     |code3desc    |
+---+----+--------------+--------------+-------------+
|1  |abc |United Kingdom|Singapore     |United States|
|2  |efg |Singapore     |United Kingdom|United States|
+---+----+--------------+--------------+-------------+

Note: if "countries" dataframe is small, broadcast join can be used for better performance.

Upvotes: 1

Related Questions