Hoori M.
Hoori M.

Reputation: 730

Why filter does not preserve partitioning?

This is a quote from jaceklaskowski.gitbooks.io.

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.

I don't understand why filter does not preserve partitioning. It's just getting a subset of each partition which satisfy a condition so I think partitions can be preserved. Why isn't it like that?

Upvotes: 6

Views: 1699

Answers (2)

Alper t. Turker
Alper t. Turker

Reputation: 35219

You are of course right. The quote is just incorrect. filter does preserve partitioning (for the reason you've already described), and it is trivial to confirm that

val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
  new org.apache.spark.HashPartitioner(11)
)

rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

rdd.partitioner == filteredRDD.partitioner
// Boolean = true

This stays in contrast to operations like map, which don't preserver partitioning (Partitioner):

rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None

Datasets are a bit more subtle, as filters are normally pushed-down, but overall the behavior is similar.

Upvotes: 5

Raphael Roth
Raphael Roth

Reputation: 27373

Filter does preserve partitioning, at least this is suggested by the source-code of filter ( preservesPartitioning = true):

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

Upvotes: 4

Related Questions