Reputation: 309
I am trying to calculate the mean of a certain column and save it as a new column, following is my code snippet of achieving this:
df = df.withColumn("avg_colname", lit(df.select(avg("colname").as("temp")).first().getAs("temp")))
In total, there are 8 columns to be calculated. On a small 3-node cluster using the "spark-submit" command, the code execution takes much more time than on a single machine using the "spark-shell" command(several minutes vs. a few seconds).
Why does the code execute on a cluster slower than on a single machine, and how can the code snippet above be improved?
Upvotes: 1
Views: 182
Reputation: 792
I agree with Raphael on keeping it to 1 machine and 1 partition if the data is small. In addition, I wanted to add my answer to give you some more insight into what is happening under the hood.
The code you've asked about has the explain plan as shown below (note: i'm only showing the calculation of the average, the addition of the column is a lazy transformation, so it won't trigger an actual computation):
scala> df.select(avg("colname").as("temp")).explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[avg(cast(col_2#6 as bigint))])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_avg(cast(col_2#6 as bigint))])
+- LocalTableScan [col_2#6]
As shown in the code snippet above, there is an exchange that will take place for calculating the average value.
This answers the question of why you see a slow down when going to a 3 node cluster. The exchange results in a network shuffle of data across different partitions that causes the slow-down (network operations are much slower than in-memory or cache operations).
To answer your second question on how to improve it, will depend on the details of your application. You can't really change much in the code, since you still need to calculate the average. But here are two things you can look at:
I would recommend 1 above. It is generally low hanging fruit in such situations and will be much simpler to achieve if you know your data and its use case.
Upvotes: 1
Reputation: 27383
you could re-write your code using window functions:
df = df.withColumn("avg_colname", avg("colname").over())
// add other columns
Or otherwise joining the averages:
df = df.crossJoin(broadcast(
df.agg(
avg("colname").as("avg_colname")
// add other columns
)
))
The two approaches should give the same result, but they work in a different way: The window-function will move all data to 1 partition, while the second aproach will use partial aggregation and will scale better for big datasets
You may also try to cache the initial dataframe and check whether this helps. Also note that if your data is small, distributed computing only adds overhead and makes it slower. If you know the data is small, best could be to use 1 machine und 1 partition...
Upvotes: 3