Reputation: 855
In Scala Spark, there are several methods that can result into data partitioning/repartitioning. These include partitionBy, coalesce, repartition, and textFile
among other functions that have a partition-count as parameter. Below, I use textFile
with a specification of at least 8 partitions. I do not want transformations to undo these partitions. For partitions to be preserved, you need to persist
the partitioning result. However, functions such as map
and flatMap
do not preserve the partitions. This I believe can be a hit on performance. PairRDDS have mapValues and flatMapValues
that maintain partitions.
Are there equivalent functions for DataSets and RDDs
for map and flatMap
that do not screw up partitions?
If I have this all mixed up, how do RDDs and DataSets maintain there partitions putting in mind that map and flatMap operations are key in their manipulation.
val tweets:RDD[Tweet] = mySpark.sparkContext.textFile(path,8).map(parseTweet).persist()
val numerical_fields_Tweets:Dataset[Tweet] = tweets.toDS()
Upvotes: 2
Views: 230
Reputation: 13548
In Spark, operations that do not repartition/shuffle the data preserve partitions (by operating on the previously established partitions). map
and flatMap
are such operations: they will not alter the number of partitions. Further, map
will not change the number of rows within partitions or their order.
how do RDDs and DataSets maintain there partitions
You are mixing two concepts: (1) the partitioner associated with data at a point in its transformation and (2) the partitions the data is split into.
There is a difference between how data is partitioned vs. which partitioner is associated with the data. As explained above, map
and flatMap
do not change the number of partitions but they make no guarantees about the partitioner associated with the data. Consider RDD's map
:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
and MapPartitionsRDD
:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false, ...)
So, while map
doesn't repartition the data, it makes no guarantees about the partitioner associated with the data because there are no restrictions on how map
may modify rows.
Pair RDDs, i.e., RDD[(K, V)]
, are somewhat special in that they are often the result of a partitioning operation and, if we use mapValues
instead of map
, we can be certain that the partitioner has not changed because we have not touched the "keys".
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
Hope this helps!
Upvotes: 2