alexanoid
alexanoid

Reputation: 25892

Apache Spark SQL query and DataFrame as reference data

I have two Spark DataFrames:

cities DataFrame with the following column:

city
-----
London
Austin

bigCities DataFrame with the following column:

name
------
London
Cairo

I need to transform DataFrame cities and add an additional Boolean column there: bigCity Value of this column must be calculated based on the following condition "cities.city IN bigCities.name"

I can do this in the following way(with a static bigCities collection):

cities.createOrReplaceTempView("cities")

var resultDf = spark.sql("SELECT city, CASE WHEN city IN ['London', 'Cairo'] THEN 'Y' ELSE 'N' END AS bigCity FROM cities")

but I don't know how to replace the static bigCities collection ['London', 'Cairo'] with bigCities DataFrame in the query. I want to use bigCities as the reference data in the query.

Please advise how to achieve this.

Upvotes: 1

Views: 1690

Answers (2)

stack0114106
stack0114106

Reputation: 8811

You can use collect_list() on the the bigCities table. Check this out

scala> val df_city = Seq(("London"),("Austin")).toDF("city")
df_city: org.apache.spark.sql.DataFrame = [city: string]

scala> val df_bigCities = Seq(("London"),("Cairo")).toDF("name")
df_bigCities: org.apache.spark.sql.DataFrame = [name: string]

scala> df_city.createOrReplaceTempView("cities")

scala> df_bigCities.createOrReplaceTempView("bigCities")

scala> spark.sql(" select city, case when array_contains((select collect_list(name) from bigcities),city) then 'Y' else 'N' end as bigCity from cities").show(false)
+------+-------+
|city  |bigCity|
+------+-------+
|London|Y      |
|Austin|N      |
+------+-------+


scala>

If the dataset is big, you can use collect_set which will be more efficient.

scala> spark.sql(" select city, case when array_contains((select collect_set(name) from bigcities),city) then 'Y' else 'N' end as bigCity from cities").show(false)
+------+-------+
|city  |bigCity|
+------+-------+
|London|Y      |
|Austin|N      |
+------+-------+


scala>

Upvotes: 2

Gofrette
Gofrette

Reputation: 478

val df = cities.join(bigCities, $"name".equalTo($"city"), "leftouter").
                withColumn("bigCity", when($"name".isNull, "N").otherwise("Y")).
                drop("name")

Upvotes: 3

Related Questions