EdgeRover
EdgeRover

Reputation: 195

Spark: Get elements of an RDD based on the elements of an array in another RDD

In Spark Scala framework, I have an RDD, rdd1, in which each element represents a single element of a matrix A:

val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}

x represents the row, y represents the column and v represents the value in matrix A.

I also have another RDD, rdd2, in the form of RDD[index, Array[(x, y)]] where the array in each element represents the set of elements of the matrix A, which are stored in rdd1, needed for the specific index represented in that element.

Now what I need to do, is get the values of the matrix A elements for each index, preserving all data including index, (x,y) and v. What would be a good approach in doing this ?

Upvotes: 0

Views: 1346

Answers (1)

Alfredo Gimenez
Alfredo Gimenez

Reputation: 2224

If I understand correctly, your question boils down to:

val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
  ((0, 0), 5.5),            
  ((1, 0), 7.7)
))

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
  (123, Array((0, 0), (1, 0))) 
))

And you want to merge these RDDs to get all values (index, (x, y), v), in this case, (123, (0,0), 5.5) and (123, (1,0), 7.7) ?

You can definitely do this using join, since both RDDs have a common column (x, y), but since one of them actually has an Array[(x, y)] you'd have to explode that into a set of rows first:

val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)

// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)

Upvotes: 1

Related Questions