gleech
gleech

Reputation: 305

How to flatten tuples in Spark?

I'm looking to flatten an RDD of tuples (using a no-op map), but I'm getting a type error:

val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)

Gives

error: type mismatch;

found : (Int, String) required: TraversableOnce[?]

val flattened = fromMap.flatMap(x => x)

The equivalent list of Lists or Arrays work fine, e.g.:

val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)

Can Scala handle this? If not, why not?

Upvotes: 6

Views: 8475

Answers (5)

sfosdal
sfosdal

Reputation: 804

There isn't a great way, but you can perserve a little type safety with this method:

val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
println(flattened.collect().mkString)

The type of flatten will be an RDD of whatever the parent of all the types in the tuple. Which, yes, in this case is Any but if the list were: List(("1", "a"), ("2", "b")) it would preserve the String type.

Upvotes: 4

Brian Clapper
Brian Clapper

Reputation: 26230

Tuples aren't collections. Unlike Python, where a tuple is essentially just an immutable list, a tuple in Scala is more like a class (or more like a Python namedtuple). You can't "flatten" a tuple, because it's a heterogeneous group of fields.

You can convert a tuple to something iterable by calling .productIterator on it, but what you get back is an Iterable[Any]. You can certainly flatten such a thing, but you've lost all compile-time type protection that way. (Most Scala programmers shudder at the thought of a collection of type Any.)

Upvotes: 11

evan.oman
evan.oman

Reputation: 5572

As others have said, there isn't a great way to do this, especially with respect to type safety.

However if you just want to print out the RDD in a nice flat format you can just map the RDD and use mkString:

scala> val myRDD = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
myRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> myRDD.map{case (a,b) => s"$a,$b"}.collect.mkString(",")
res0: String = 1,a,2,b,3,c

Upvotes: 1

gleech
gleech

Reputation: 305

From Lyuben's comment, this actually can be done, sneakily:

sc.parallelize(List(("a", 1), ("c", 2), ("e", 4))).flatMap(_.productIterator).collect()

All honour to him. (Though as Brian notes, this will forego type safety.)

Upvotes: 1

Balaji Reddy
Balaji Reddy

Reputation: 5710

  val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
  val flattened = fromTuples.flatMap(x => Array(x))
  flattened.collect()

The reason for your error is

flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

Upvotes: 1

Related Questions