Reputation: 4177
I need to process partition per partition (long story).
Using mapPartitions
is working fine when using RDDs. In the example, when using rdd.mapPartitions(mapper).collect()
all work as expected.
But, when transforming to DataFrame, one partition is processed twice.
Why this is happening and how to avoid it?
Following, the output of the next simple example. We can read how the function is executed 3 times, when there are only two partitions. One of the partitions [Row(id=1), Row(id=2)]
is processed two times.
It is courious that one of the executions is ignored, as we can see in the DataDrame resulted.
size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=1), Row(id=2)]
size: 2 > values: [Row(id=3), Row(id=4)]
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
> Mapper executions: 3
Simple example used:
from typing import Iterator
from pyspark import Row
from pyspark.sql import SparkSession
def gen_random_row(id: str):
return Row(id=id)
if __name__ == '__main__':
spark = SparkSession.builder.master("local[1]").appName("looking for the error").getOrCreate()
executions_counter = spark.sparkContext.accumulator(0)
rdd = spark.sparkContext.parallelize([
gen_random_row(1),
gen_random_row(2),
gen_random_row(3),
gen_random_row(4),
], 2)
def mapper(iterator: Iterator[Row]) -> Iterator[Row]:
executions_counter.add(1)
lst = list(iterator)
print(f"size: {len(lst)} > values: {lst}")
for r in lst:
yield r
# rdd.mapPartitions(mapper).collect()
rdd.mapPartitions(mapper).toDF().show()
print(f"> Mapper executions: {executions_counter.value}")
spark.stop()
Upvotes: 1
Views: 1141
Reputation: 4177
The solution is passing the schema to toDF
Looks like Spark is processing one partition to infer the schema.
To solve it:
schema = StructType([StructField("id", IntegerType(), True)])
rdd.mapPartitions(mapper).toDF(schema).show()
With this code, every partition is processed one time.
Upvotes: 2