Armand Grillet
Armand Grillet

Reputation: 3399

Flattening the key of a RDD

I have a Spark RDD of type (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double]). I wish to flatten its key to transform it into a RDD of type breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double]). I am currently doing:

val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))

The signature of anonymousOrdering() is String => (Array[DenseVector[Double]], DenseVector[Double]).

It returns type mismatch: required: TraversableOnce[?]. The Python code doing the same thing is:

newRDD = oldRDD.flatMap(lambda point: [(tile, point) for tile in anonymousOrdering(point)])

How to do the same thing in Scala ? I generally use flatMapValuesbut here I need to flatten the key.

Upvotes: 0

Views: 451

Answers (3)

Armand Grillet
Armand Grillet

Reputation: 3399

As anonymousOrdering() is a function that you have in your code, update it in order to return a Seq[(breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])]. It is like doing (tile, point) for tile in anonymousOrdering(point)] but directly at the end of the anonymous function. The flatMap will then take care to create one partition for each element of the sequences.

As a general rule, avoid having a collection as a key in a RDD.

Upvotes: 0

Alfredo Gimenez
Alfredo Gimenez

Reputation: 2224

If I understand your question correctly, you can do:

val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
// newRDD is RDD[(Array[DenseVector], DenseVector)]

In that case, you can "flatten" the Array portion of the tuple using pattern matching and a for/yield statement:

newRDD = newRDD.flatMap{case (a: Array[DenseVector[Double]], b: DenseVector[Double]) => for (v <- a) yield (v, b)}
// newRDD is RDD[(DenseVector, DenseVector)]

Although it's still not clear to me where/how you want to use groupByKey

Upvotes: 2

Michael Stratton
Michael Stratton

Reputation: 107

Change the code to use Map instead of FlatMap:

val newRDD = oldRDD.map(ob => anonymousOrdering(ob)).groupByKey()

You would only want to use flatmap here if anonymousOrdering returned a list of tuples and you wanted it flattened down.

Upvotes: 0

Related Questions