Reputation: 305
I'm looking to flatten an RDD of tuples (using a no-op map), but I'm getting a type error:
val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)
Gives
error: type mismatch;
found : (Int, String) required: TraversableOnce[?]
val flattened = fromMap.flatMap(x => x)
The equivalent list of List
s or Array
s work fine, e.g.:
val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)
Can Scala handle this? If not, why not?
Upvotes: 6
Views: 8475
Reputation: 804
There isn't a great way, but you can perserve a little type safety with this method:
val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
println(flattened.collect().mkString)
The type of flatten will be an RDD
of whatever the parent of all the types in the tuple. Which, yes, in this case is Any
but if the list were: List(("1", "a"), ("2", "b"))
it would preserve the String
type.
Upvotes: 4
Reputation: 26230
Tuples aren't collections. Unlike Python, where a tuple is essentially just an immutable list, a tuple in Scala is more like a class (or more like a Python namedtuple
). You can't "flatten" a tuple, because it's a heterogeneous group of fields.
You can convert a tuple to something iterable by calling .productIterator
on it, but what you get back is an Iterable[Any]
. You can certainly flatten such a thing, but you've lost all compile-time type protection that way. (Most Scala programmers shudder at the thought of a collection of type Any
.)
Upvotes: 11
Reputation: 5572
As others have said, there isn't a great way to do this, especially with respect to type safety.
However if you just want to print out the RDD
in a nice flat format you can just map the RDD
and use mkString
:
scala> val myRDD = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
myRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> myRDD.map{case (a,b) => s"$a,$b"}.collect.mkString(",")
res0: String = 1,a,2,b,3,c
Upvotes: 1
Reputation: 305
From Lyuben's comment, this actually can be done, sneakily:
sc.parallelize(List(("a", 1), ("c", 2), ("e", 4))).flatMap(_.productIterator).collect()
All honour to him. (Though as Brian notes, this will forego type safety.)
Upvotes: 1
Reputation: 5710
val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(x => Array(x))
flattened.collect()
The reason for your error is
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Upvotes: 1