Cheeko
Cheeko

Reputation: 1223

Spark UDF exception when accessing broadcast variable

I'm having difficulty accessing a scala.collection.immutable.Map from inside a spark UDF.

I'm broadcasting the map

val browserLangMap = sc.broadcast (Source.fromFile(browserLangFilePath).getLines.map(_.split(,)).map(e => (e(0).toInt,e(1))).toMap)

creating UDF that access the map

def addBrowserCode = udf((browserLang:Int) => if(browserLangMap.value.contains(browserLang)) browserLangMap.value(browserLang) else "")`

using the UDF to add new column

val joinedDF = rawDF.join(broadcast(geoDF).as("GEO"), $"start_ip" === $"GEO.start_ip_num", "left_outer")
                        .withColumn("browser_code", addBrowserCode($"browser_language"))
                        .selectExpr(getSelectQuery:_*)

full stack trace --> https://www.dropbox.com/s/p1d5322fo9cxro6/stack_trace.txt?dl=0

org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$
Serialization stack:
        - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$@30b4ba52)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: MetaDataSchema$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$MetaDataSchema$)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(browser_language#235))
        - field (class: org.apache.spark.sql.catalyst.expressions.If, name: falseValue, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.If, if (isnull(browser_language#235)) null else UDF(browser_language#235))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, if (isnull(browser_language#235)) null else UDF(browser_language#235) AS browser_language#507)
        - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@5ae38c4e)
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@5ae38c4e))
        - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 80 more

I know its the access to broadcast Map that is causing this. When I remove reference to that in the UDF there are no exception.

def addBrowserCode = udf((browserLang:Int) => browserLang.toString())  //Test UDF without accessing broadcast Map and it works

Spark version 1.6

Upvotes: 4

Views: 3251

Answers (2)

Jonathan
Jonathan

Reputation: 267

The root cause relates to declaring val sc: SparkContext = spark.sparkContext in the code for the broadcast variable. If the code runs on a spark-shell, sc is already available by default. Declaring sc twice (one by default and one in the code) causes this "Task not serializable" problem. Therefore, unlike the previous answer claimed, there is no issue with the spark-shell. Just temporarily remove the SparkContext declaration while in the spark-shell, the code will be OK.

Upvotes: 1

Cheeko
Cheeko

Reputation: 1223

I found this to be a strange behavior with ":paste" in spark shell. This happens only when I paste my entire code in a single multi-line paste with :paste.

The same code works perfectly if I paste the broadcast and UDF creation first and then paste the join+saveToFile in a separate :paste.

May be scala shell issue. I don't know.

Upvotes: 4

Related Questions