Reputation: 7605
I need to extract and transform from a big dataset some information which will be later consumed by other dataset.
Since the information to be consumed is always the same, and since it can be stored in a pair-value fashion, I was considering to just save this information in a look-at map which will be consumed by a udf, so I avoid several calls to the big dataset.
The problem is I am getting the following error:
org.apache.spark.SparkException: Task not serializable
Is there any way to make the map serializable?
In case it is not possible, is there another way to store information in a look-at object in Spark?
Here is my code:
val cityTimeZone: scala.collection.immutable.Map[String,Double] = Map("CEB" -> 8.0, "LGW" -> 0.0, "CPT" -> 2.0
, "MUC" -> 1.0, "SGN" -> 7.0, "BNE" -> 10.0, "DME" -> 3.0, "FJR" -> 4.0, "BAH" -> 3.0, "ARN" -> 1.0, "FCO" -> 1.0, "DUS" -> 1.0, "MRU" -> 4.0, "JFK" -> -5.0, "GLA" -> 0.0)
def getLocalHour = udf ((city:String, timeutc:Int) => {
val timeOffset = cityTimeZone(city)
val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
localtime
})
//$"dateutc" is a timestamp column like this: 2017-03-01 03:45:00 and $"city" a 3 letters code in capitals, like those in the map above
val newDF = DF
.select("dateutc","city")
.withColumn("utchour", hour($"dateutc"))
.withColumn("localhour", getLocalHour($"city", $"utchour"))
display(newDF)
Upvotes: 3
Views: 2379
Reputation: 7605
Seems people is still reaching this question. Andrey's answer helped me back them, but nowadays I can provide a more generic solution to the org.apache.spark.SparkException: Task not serializable
is to don't declare variables in the driver as "global variables" to later access them in the executors.
So the mistake I was making on here was to declare the map cityTimeZone
in the driver but later I was planning to use inside the udf, which computation will occur already in the executors.
Possible solutions would be to pass cityTimeZone
as a third parameter in the udf getLocalHour
or declaring that map inside cityTimeZone
Upvotes: 0
Reputation: 44908
The member variable declaration
val cityTimeZone
in combination with
cityTimeZone(city)
inside the udf
is problematic, because the latter is just a shortcut for
this.cityTimeZone(city)
where this
is (presumably) some huge non-serializable object (probably because it contains a reference to a non-serializable spark context).
Make getLocalHour
a lazy val
, and move the map that is needed by the udf
inside the definition of getLocalHour
as a local variable, something along these lines:
lazy val getLocalHour = {
val cityTimeZone: Map[String, Double] = Map("CEB" -> 8.0, "LGW" -> 0.0)
udf ((city:String, timeutc:Int) => {
val timeOffset = cityTimeZone(city)
val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
localtime
})
}
Alternatively, attach cityTimeZone
to some serializable object (i.e. some object that does not contain references to any threads, sockets, spark contexts and all the other non-serializable stuff; e.g. package objects with utility methods and constants would be fine).
If the udf
definition contains references to any other member variables, treat those accordingly.
Upvotes: 1