Reputation: 195
In Spark Scala framework, I have an RDD, rdd1
, in which each element represents a single element of a matrix A
:
val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}
x
represents the row,
y
represents the column and
v
represents the value in matrix A
.
I also have another RDD, rdd2
, in the form of RDD[index, Array[(x, y)]]
where the array in each element represents the set of elements of the matrix A
, which are stored in rdd1
, needed for the specific index
represented in that element.
Now what I need to do, is get the values of the matrix A
elements for each index
, preserving all data including index
, (x,y)
and v
. What would be a good approach in doing this ?
Upvotes: 0
Views: 1346
Reputation: 2224
If I understand correctly, your question boils down to:
val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
And you want to merge these RDDs to get all values (index, (x, y), v)
, in this case, (123, (0,0), 5.5)
and (123, (1,0), 7.7)
?
You can definitely do this using join
, since both RDDs have a common column (x, y)
, but since one of them actually has an Array[(x, y)]
you'd have to explode that into a set of rows first:
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
Upvotes: 1