Reputation: 5389
I have a fairly small lookup file that I need to broadcast for efficiency.
If the key value pairs are unique, then you can use the following code to distribute the file as a hashmap across worker nodes.
val index_file = sc.textFile("reference.txt").map { line => ( (line.split("\t"))(1), (line.split("\t"))(0)) }
val index_map = index_file.collectAsMap()
sc.broadcast(index_map)
Unfortunately, the file has several entries for a given key. Is there any way to distribute this multimap variable? Reading the documentation, looks like collectAsMap does not support a multimap.
val mmap = new collection.mutable.HashMap[String, collection.mutable.Set[Int]]() with collection.mutable.MultiMap[String, Int]
val index_map = sc.textFile("reference.txt").map {
case line =>
val key = (line.split("\t"))(1)
val value = (line.split("\t"))(0).toInt
mmap.addBinding(key, value)
}
Now how do I broadcast index_map?
Upvotes: 2
Views: 666
Reputation: 13154
You can broadcast the map using sc.broadcast(mmap)
, but that simply distributes a copy of the map to your worker nodes, so that data is accessable on your worker nodes.
From your code, it looks like what you really want is to update the map from the workers, but you cannot do that. The workers do not have the same instance of the map, so they will each update their own map. What you can do instead is split the text file into key-value pairs (in parallel), then collect them and put them into the map:
val mmap = new collection.mutable.HashMap[String, collection.mutable.Set[Int]]() with collection.mutable.MultiMap[String, Int]
val index_map = sc.textFile("reference.txt")
.collect
.map (line => {
val key = (line.split("\t"))(1)
val value = (line.split("\t"))(0).toInt
mmap.addBinding(key, value)
})
To use Spark for a task where data will fit in a map seems somewhat overkill to me, though ;)
Upvotes: 1