Reputation: 1
I'm working with PySpark to process large amounts of data. However, I noticed that the function called by mapPartitions
is executed one more time than expected. For instance, in the following code block, the reformat
function should be called four times but is called five times: four times when the DataFrame is cached and a fifth time when the show
method is invoked.
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.host", "localhost").appName('SparkByExamples.com').getOrCreate()
data = [
('James','Smith','M',3000),
('James','Smith','M',3000),
('James','Smith','M',3000),
('James','Smith','M',3000),
('Anna','Rose','F',4100),
('Anna','Rose','F',4100),
('Anna','Rose','F',4100),
('Anna','Rose','F',4100),
('Robert','Williams','M',6200),
('Robert','Williams','M',6200),
('Robert','Williams','M',6200),
('Robert','Williams','M',6200),
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
#Example 1 mapPartitions()
def reformat(partitionData):
for row in partitionData:
yield [row.firstname+","+row.lastname,row.salary*10/100]
df2=df.repartition(4).rdd.mapPartitions(reformat).toDF(["name","bonus"])
df2.cache()
df2.show()
In this case, the time may not be an issue. However, for large datasets, it could become a significant problem. Additionally, if the function involves calls to an external API to fetch data or evaluate some logic, it might lead to overlapping or redundant API requests.
I was testing the example above, with a more simple logic, but I have not identified the problem.
Upvotes: 0
Views: 87
Reputation: 56
I have tested the code:
%python
from pyspark.sql import SparkSession
from pyspark import StorageLevel
# Initialize Spark Session
spark = SparkSession.builder.config("spark.driver.host", "localhost").appName('SparkByExamples.com').getOrCreate()
# Sample data
data = [
('James', 'Smith', 'M', 3000),
('James', 'Smith', 'M', 3000),
('James', 'Smith', 'M', 3000),
('James', 'Smith', 'M', 3000),
('Anna', 'Rose', 'F', 4100),
('Anna', 'Rose', 'F', 4100),
('Anna', 'Rose', 'F', 4100),
('Anna', 'Rose', 'F', 4100),
('Robert', 'Williams', 'M', 6200),
('Robert', 'Williams', 'M', 6200),
('Robert', 'Williams', 'M', 6200),
('Robert', 'Williams', 'M', 6200),
]
columns = ["firstname", "lastname", "gender", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.show()
# Example 1 mapPartitions()
def reformat(partitionData):
with open("/tmp/spark_debug4.log", "a") as f:
f.write("Reformat function called for a partition\n")
for row in partitionData:
yield [row.firstname + "," + row.lastname, row.salary * 10 / 100]
# Repartition the DataFrame and apply mapPartitions
df2 = df.repartition(4).rdd.mapPartitions(reformat).toDF(["name", "bonus"])
# Persist the DataFrame
df2.persist(StorageLevel.MEMORY_AND_DISK)
df2.count()
# Trigger actions to force computation
df2.show()
print(df2.rdd.getNumPartitions())
The print statement in the reformat calls 5 times:
%sh
cat /tmp/spark_debug4.log
INFO:py4j.clientserver:Received command c on object id p0
Reformat function called for a partition
Reformat function called for a partition
Reformat function called for a partition
Reformat function called for a partition
Reformat function called for a partition
You have given 4 partitions, but for count() and show(), the reformat function is called 5 times. Extra Partition for Driver execution(Sometimes, Spark may create an additional metadata partition or manage splits).
The mapPartitions transformation operates on logical partitions, and sometimes on intermediate partitions (Ex: from shuffles or internal processing) may trigger additional calls.
Upvotes: 0
Reputation: 56
When you call df2.cache(), Spark begins to cache the result of the DataFrame df2, but the action to materialize this cache is still delayed until the action is executed.
The transformation pipeline is executed two times when the df2.cache() call is made and df2.show() is executed.
The correct order of executions is below: Make changes in your code:
df2.count() #Transformations will be materialized
df2.cache()
df2.show() # Now it fetches from the cached data
Upvotes: 0