Reputation: 71
I have a broadcast variable which is constructed in the following manner
// Function
def loadMovieNames(sparkContext: SparkContext): Map[Int, String] = {
// Handle character encoding issues:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
// Create a Map of Ints to Strings, and populate it from u.item.
var movieNames: Map[Int, String] = Map()
val lines = sparkContext.textFile("s3a://bucket/movies.dat")
for (line <- lines) {
var fields = line.split("::")
if (fields.length > 1) {
movieNames += (fields(0).toInt -> fields(1))
}
}
return movieNames
}
// Main
val nameDict = loadMovieNames(spark.sparkContext)
val broadcastNames = spark.sparkContext.broadcast(nameDict)
Below is the code in main used to access the broadcast variable.
val resultDF = recommendationsDF.sort($"score".desc).limit(30)
val check = (id1: Int, id2: Int) => if (id1 == movie) broadcastNames.value(id2) else broadcastNames.value(id1)
val getName = udf(check)
val results = resultDF.withColumn("movie", getName($"movieId1", $"movieId2"))
results.show(30)
But when I try to do a lookup in the broadcast variable later in main, I get the following exception.
Caused by: java.util.NoSuchElementException: key not found: 1196
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at com.spark.movierec.MovieRecDF$$anonfun$5.apply(MovieRecDF.scala:144)
at com.spark.movierec.MovieRecDF$$anonfun$5.apply(MovieRecDF.scala:143)
I converted the Map to a broadcast variable when I initially ran into the same issue. After reading the answer to this question here, I've realized it could be an issue with closures. But I'm still not sure how to tackle the issue.
Upvotes: 0
Views: 5780
Reputation: 330063
One way to create a local map is to use collectAsMap
:
val nameDict = sparkContext.broadcast(sparkContext
.textFile(path)
.map(_.split("::"))
.filter(_.size > 1)
.map(arr => (arr(0).toInt, arr(1)))
.collectAsMap())
You should also consider using DataFrames
and broadcast join in place of UDF and broadcast variable, but the logic of you application is not that clear.
Upvotes: 3