Baptiste Merliot
Baptiste Merliot

Reputation: 861

flatten Options inside Spark RDD

I'm trying to reproduce the behaviour of scala vanilla collections' flatten with Option on a Spark RDD. For example:

Seq(Some(1), None, Some(2), None, None).flatten
> Seq[Int] = List(1, 2)
// None are removed, and Some are unwrapped

sc.parallelize(Seq(Some(1), None, Some(2))).flatten.collect()
> error: value flatten is not a member of org.apache.spark.rdd.RDD[Option[Int]]
>      sc.parallelize(Seq(Some(1), None, Some(2), None, None)).flatten.collect()
// The function does not exist for RDDs

Of course the following works but this means that you collect before filtering, e.g. collect a larger collection on a single machine.

sc.parallelize(Seq(Some(1), None, Some(2))).collect().flatten
> Array[Int] = Array(1, 2)

The solution I found is

sc.parallelize(Seq(Some(1), None, Some(2))).filter(_.isDefined).map(_.get).collect()

but that's not very clear. Is there a cleaner way?

Upvotes: 0

Views: 270

Answers (2)

Leo C
Leo C

Reputation: 22449

You can simply perform flatMap with an identity function:

sc.parallelize(Seq(Some(1), None, Some(2))).flatMap(x => x).collect
// res1: Array[Int] = Array(1, 2)

Upvotes: 3

YisraelU
YisraelU

Reputation: 372

You can define an Extension method on the container class to facilitate flattening and in keeping with the coding style you would like to use.

Generally one does this in Scala using implicit classes

You can also use map and pass in a function that flattens.

Upvotes: 0

Related Questions