alexanoid
alexanoid

Reputation: 25872

How to refer broadcast variable in Spark DataFrameSQL

I have the following SparkSQL:

val resultDf = spark.sql("SELECT name, phone, country FROM users")

I'd like to filter returned records by countries which are present in the following collection:

val countries = Seq("Italy", "France", "United States", "Poland", "Spain")

For example I can create the broadcast variable based on the collection:

val countriesBroadcast = sc.broadcast(countries)

but is it possible(and if so - how?) to use the countriesBroadcast variable inside of my SQL query?

Upvotes: 1

Views: 3281

Answers (2)

Bhima Rao Gogineni
Bhima Rao Gogineni

Reputation: 267

In spark data frame API we can broadcast the entire table can be joined with the target table to get the desired output. Here is the example code.

Imports

import org.apache.spark.sql.functions.broadcast

Code

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.read.option("header", true).csv("data/user.txt")
df.createOrReplaceTempView("users")
val countries = Seq("Italy", "France", "United States", "Poland", "Spain")
import spark.implicits._
spark.sparkContext.parallelize(countries, 1).toDF("country").createOrReplaceTempView("countries")
broadcast(spark.table("countries")).join(spark.table("users"), "country").show()

"data/user.txt" file contents

 name,phone,country
 a,123,India
 b,234,Italy
 c,526,France
 d,765,India

Code output:

+-------+----+-----+
|country|name|phone|
+-------+----+-----+
|  Italy|   b|  234|
| France|   c|  526|
+-------+----+-----+

Note: code tested with Spark 2.2 and Scala 2.11

Upvotes: 1

user10628251
user10628251

Reputation: 21

It is not possible, with exception to UserDefinedFunctions, UserDefinedAggregateFunctions and Aggregators (i.e. non-declarative code).

To use broadcasting with DataFrame / SQL API you should use DataFrames and use broadcast hint - Spark SQL broadcast hash join.

Upvotes: 2

Related Questions