Reputation: 730
This is a quote from jaceklaskowski.gitbooks.io.
Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.
I don't understand why filter does not preserve partitioning. It's just getting a subset of each partition which satisfy a condition so I think partitions can be preserved. Why isn't it like that?
Upvotes: 6
Views: 1699
Reputation: 35219
You are of course right. The quote is just incorrect. filter
does preserve partitioning (for the reason you've already described), and it is trivial to confirm that
val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
new org.apache.spark.HashPartitioner(11)
)
rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
rdd.partitioner == filteredRDD.partitioner
// Boolean = true
This stays in contrast to operations like map
, which don't preserver partitioning (Partitioner
):
rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None
Datasets
are a bit more subtle, as filters are normally pushed-down, but overall the behavior is similar.
Upvotes: 5
Reputation: 27373
Filter does preserve partitioning, at least this is suggested by the source-code of filter ( preservesPartitioning = true
):
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
Upvotes: 4