Kenny
Kenny

Reputation: 1982

Is .withColumn and .agg calculated in parallel in pyspark?

Consider for example

df.withColumn("customr_num", col("customr_num").cast("integer")).\
withColumn("customr_type", col("customr_type").cast("integer")).\
agg(myMax(sCollect_list("customr_num")).alias("myMaxCustomr_num"), \
    myMean(sCollect_list("customr_type")).alias("myMeanCustomr_type"), \
    myMean(sCollect_list("customr_num")).alias("myMeancustomr_num"),\
    sMin("customr_num").alias("min_customr_num")).show()

Are .withColumn and the list of functions inside agg (sMin, myMax, myMean, etc.) calculated in parallel by Spark, or in sequence ?

If sequential, how do we parallelize them ?

Upvotes: 2

Views: 3445

Answers (1)

Oli
Oli

Reputation: 10406

By essence, as long as you have more than one partition, operations are always parallelized in spark. If what you mean though is, are the withColumn operations going to be computed in one pass over the dataset, then the answer is also yes. In general, you can use the Spark UI to know more about the way things are computed.

Let's take an example that's very similar to your example.

spark.range(1000)
    .withColumn("test", 'id cast "double")
    .withColumn("test2", 'id + 10)
    .agg(sum('id), mean('test2), count('*))
    .show

And let's have a look at the UI.

enter image description here

Range corresponds to the creation of the data, then you have project (the two withColumn operations) and then the aggregation (agg) within each partition (we have 2 here). In a given partition, these things are done sequentially, but for all the partitions at the same time. Also, they are in the same stage (on blue box) which mean that they are all computed in one pass over the data.

Then there is a shuffle (exchange) which mean that data is exchanged over the network (the result of the aggregations per partition) and the final aggregation is performed (HashAggregate) and then sent to the driver (collect)

Upvotes: 4

Related Questions