Reputation: 628
I’m looking for a way to compare subsets of an RDD intelligently.
Lets say I had an RDD with key/value pairs of type (Int->T). I eventually need to say “compare all values of key 1 with all values of key 2 and compare values of key 3 to the values of key 5 and key 7”, how would I go about doing this efficiently?
The way I’m currently thinking of doing it is by creating a List of filtered RDDs and then using RDD.cartesian()
def filterSubset[T] = (b:Int, r:RDD[(Int, T)]) => r.filter{case(name, _) => name == b}
Val keyPairs:(Int, Int) // all key pairs
Val rddPairs = keyPairs.map{
case (a, b) =>
filterSubset(a,r).cartesian(filterSubset(b,r))
}
rddPairs.map{whatever I want to compare…}
I would then iterate the list and perform a map on each of the RDDs of pairs to gather the relational data that I need.
What I can’t tell about this idea is whether it would be extremely inefficient to set up possibly of hundreds of map jobs and then iterate through them. In this case, would the lazy valuation in spark optimize the data shuffling between all of the maps? If not, can someone please recommend a possibly more efficient way to approach this issue?
Thank you for your help
Upvotes: 2
Views: 640
Reputation: 11
Using Dataframe you can easily do the cartesian operation using join:
dataframe1.join(dataframe2, dataframe1("key")===dataframe2("key"))
It will probably do exactly what you want, but efficiently.
If you don't know how to create an Dataframe, please refer to http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes
Upvotes: 0
Reputation: 330303
One way you can approach this problem is to replicate and partition your data to reflect key pairs you want to compare. Lets start with creating two maps from the actual keys to the temporary keys we'll use for replication and joins:
def genMap(keys: Seq[Int]) = keys
.zipWithIndex.groupBy(_._1)
.map{case (k, vs) => (k -> vs.map(_._2))}
val left = genMap(keyPairs.map(_._1))
val right = genMap(keyPairs.map(_._2))
Next we can transform data by replicating with new keys:
def mapAndReplicate[T: ClassTag](rdd: RDD[(Int, T)], map: Map[Int, Seq[Int]]) = {
rdd.flatMap{case (k, v) => map.getOrElse(k, Seq()).map(x => (x, (k, v)))}
}
val leftRDD = mapAndReplicate(rddPairs, left)
val rightRDD = mapAndReplicate(rddPairs, right)
Finally we can cogroup:
val cogrouped = leftRDD.cogroup(rightRDD)
And compare / filter pairs:
cogrouped.values.flatMap{case (xs, ys) => for {
(kx, vx) <- xs
(ky, vy) <- ys
if cosineSimilarity(vx, vy) <= threshold
} yield ((kx, vx), (ky, vy)) }
Obviously in the current form this approach is limited. It assumes that values for arbitrary pair of keys can fit into memory and require a significant amount of network traffic. Still it should give you some idea how to proceed.
Another possible approach is to store data in the external system (for example database) and fetch required key-value pairs on demand.
Since you're trying to find similarity between elements I would also consider completely different approach. Instead of naively comparing key-by-key I would try to partition data using custom partitioner which reflects expected similarity between documents. It is far from trivial in general but should give much better results.
Upvotes: 3