Reputation: 65
I'm trying to set up a pyspark job that estimates p25, p50, p75, p90 on a daily incoming volume of ~700GB data. I'm running the job with 40 worker nodes each with 32G memory and 8 vCPU and yet ends up running for ~15 hours to complete. I'm assuming the delay is due to the fact where the value needs to be sorted across nodes to compute percentiles. Is there an alternative that can speed up this process?
Schema of input data -
root
|-- processed_date: date (nullable = true)
|-- id: string (nullable = true)
|-- experiment: string (nullable = true)
|-- type: string (nullable = true)
|-- value: double (nullable = true)
|-- revision: string (nullable = true)
|-- source: string (nullable = true)
|-- region: string (nullable = true)
df_agg = df.groupby('processed_date', 'id', 'experiment', 'type').agg(
F.min('value').alias('min'),
F.max('value').alias('max'),
F.avg('value').alias('avg'),
F.expr('percentile(value, 0.25)').alias('p25'),
F.expr('percentile(value, 0.50)').alias('p50'),
F.expr('percentile(value, 0.75)').alias('p75'),
F.expr('percentile(value, 0.90)').alias('p90'))
Thanks!
Upvotes: 1
Views: 598
Reputation: 8410
Using only columns to repartition by means that it uses hash partitioner on the columns used in the expression using spark.sql.shuffle.partitions
, so in the case that default shuffle partitions are not adequate this will not work well. (default is 200
)
u should set numPartitions as well as column expressions
. for this case,
i would do something like this:
df=df.repartition(1000, *['processed_date', 'id', 'experiment', 'type'])
Or before applying repartition(using only columns), set the shuffle partitions:
spark.conf.set("spark.sql.shuffle.partitions",1000)
df=df.repartition(*['processed_date', 'id', 'experiment', 'type'])`
I would suggest you repartition and spill to disk before applying groupby in order to utilize adequate partitioning and in-memory computing
(ensure single pass):
using data spilled to disk will still be faster than not putting into memory at all.
from pyspark.storagelevel import StorageLevel
df=df.repartition(1000, *['processed_date', 'id', 'experiment', 'type'])\
.persist(StorageLevel.MEMORY_AND_DISK)
NumPartitions is computed by workers * cores * (2 or 3)
(as almost all modern virtual cores are multi-threaded) which gives 8 * 40 * 3 = 960, which I rounded to 1000
Upvotes: 1
Reputation: 993
You can try repartitioning DataFrame.repartition
the dataframe on the columns by
df = df.repartition('processed_date', 'id', 'experiment', 'type')
So that all the records related to a combination of the above columns will be in the same node.
Upvotes: 0