Reputation: 3399
I have a Spark RDD of type (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double])
. I wish to flatten its key to transform it into a RDD of type breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])
. I am currently doing:
val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
The signature of anonymousOrdering() is String => (Array[DenseVector[Double]], DenseVector[Double])
.
It returns type mismatch: required: TraversableOnce[?]
. The Python code doing the same thing is:
newRDD = oldRDD.flatMap(lambda point: [(tile, point) for tile in anonymousOrdering(point)])
How to do the same thing in Scala ? I generally use flatMapValues
but here I need to flatten the key.
Upvotes: 0
Views: 451
Reputation: 3399
As anonymousOrdering()
is a function that you have in your code, update it in order to return a Seq[(breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])]
. It is like doing (tile, point) for tile in anonymousOrdering(point)]
but directly at the end of the anonymous function. The flatMap
will then take care to create one partition for each element of the sequences.
As a general rule, avoid having a collection as a key in a RDD.
Upvotes: 0
Reputation: 2224
If I understand your question correctly, you can do:
val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
// newRDD is RDD[(Array[DenseVector], DenseVector)]
In that case, you can "flatten" the Array
portion of the tuple using pattern matching and a for
/yield
statement:
newRDD = newRDD.flatMap{case (a: Array[DenseVector[Double]], b: DenseVector[Double]) => for (v <- a) yield (v, b)}
// newRDD is RDD[(DenseVector, DenseVector)]
Although it's still not clear to me where/how you want to use groupByKey
Upvotes: 2
Reputation: 107
Change the code to use Map instead of FlatMap:
val newRDD = oldRDD.map(ob => anonymousOrdering(ob)).groupByKey()
You would only want to use flatmap here if anonymousOrdering returned a list of tuples and you wanted it flattened down.
Upvotes: 0