Shangfu
Shangfu

Reputation: 41

Spark in Scala: How to avoid linear scan for searching a key in each partition?

I have one huge key-value dataset named A, and a set of keys named B as queries. My task is that for each key in B, return the key exists in A or not, if it exists, return the value.

I partition A by HashParitioner(100) first. Currently I can use A.join(B') to solve it, where B' = B.map(x=>(x,null)). Or we can use A.lookup() for each key in B.

However, the problem is that both join and lookup for PairRDD is linear scan for each partition. This is too slow. As I desire, each partition could be a Hashmap, so that we can find the key in each parition in O(1). So the ideal strategy is that when the master machine receives a bunch of keys, the master assigns each key to its corresponding partition, then the partition uses its Hashmap to find the keys and return the result to the master machine.

Is there an easy way to achieve it?

One potential way: As I searched online, a similar question is here

http://mail-archives.us.apache.org/mod_mbox/spark-user/201401.mbox/%3CCAMwrk0kPiHoX6mAiwZTfkGRPxKURHhn9iqvFHfa4aGj3XJUCNg@mail.gmail.com%3E

As it said, I built the Hashmap for each partition using the code as follows

 val hashpair = A.mapPartitions(iterator => {
     val hashmap = new HashMap[Long, Double]
     iterator.foreach { case (key, value)  => hashmap.getOrElseUpdate(key,value) }
     Iterator(hashmap)
 })

Now I get 100 Hashmap (if I have 100 partitions for data A). Here I'm lost. I don't know how to ask query, how to use the hashpair to search keys in B, since hashpair is not a regular RDD. Do I need to implement a new RDD and implement RDD methods for hashpair? If so, what is the easiest way to implement join or lookup methods for hashpair?

Thanks all.

Upvotes: 3

Views: 551

Answers (1)

Dan Osipov
Dan Osipov

Reputation: 1431

You're probably looking for the IndexedRDD: https://github.com/amplab/spark-indexedrdd

Upvotes: 6

Related Questions