angelcervera
angelcervera

Reputation: 4177

Why few partitions are processed twice if mapPartitions is used with toDF()

I need to process partition per partition (long story).

Using mapPartitions is working fine when using RDDs. In the example, when using rdd.mapPartitions(mapper).collect() all work as expected.

But, when transforming to DataFrame, one partition is processed twice.

Why this is happening and how to avoid it?

Following, the output of the next simple example. We can read how the function is executed 3 times, when there are only two partitions. One of the partitions [Row(id=1), Row(id=2)] is processed two times. It is courious that one of the executions is ignored, as we can see in the DataDrame resulted.

size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=3), Row(id=4)]
                                                                                +---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+
> Mapper executions: 3

Simple example used:

from typing import Iterator

from pyspark import Row
from pyspark.sql import SparkSession


def gen_random_row(id: str):
    return Row(id=id)


if __name__ == '__main__':
    spark = SparkSession.builder.master("local[1]").appName("looking for the error").getOrCreate()
    executions_counter = spark.sparkContext.accumulator(0)

    rdd = spark.sparkContext.parallelize([
        gen_random_row(1),
        gen_random_row(2),
        gen_random_row(3),
        gen_random_row(4),
    ], 2)

    def mapper(iterator: Iterator[Row]) -> Iterator[Row]:
        executions_counter.add(1)
        lst = list(iterator)
        print(f"size: {len(lst)} > values: {lst}")
        for r in lst:
            yield r

    # rdd.mapPartitions(mapper).collect()
    rdd.mapPartitions(mapper).toDF().show()

    print(f"> Mapper executions: {executions_counter.value}")

    spark.stop()

Upvotes: 1

Views: 1141

Answers (1)

angelcervera
angelcervera

Reputation: 4177

The solution is passing the schema to toDF

Looks like Spark is processing one partition to infer the schema.

To solve it:

    schema = StructType([StructField("id", IntegerType(), True)])
    rdd.mapPartitions(mapper).toDF(schema).show()

With this code, every partition is processed one time.

Upvotes: 2

Related Questions