Reputation: 7997
I'm working in pyspark 2.3 and I'm trying to figure out the most efficient way to get some aggregate statistics from a dataframe.
I have a dataframe with 1.5bn records spread out over a relatively small cluster of 10 nodes. Each has 16gb of ram and 4 cores. My replication factor is set to 2.
My dataframe has maybe 15 columns, which are a mixture of data types, but I'm only interested in two columns - ID and eventDate. The code I'd like to run is pretty simple:
output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')
What I'm trying to figure out is the most efficient way to do this operation. ID, which is the field I'm grouping by, has 12m values, and df.rdd.getNumPartitions() is currently 642.
Am I better off first projecting my dataframe down to just the two columns I want? With so many IDs, should I repartition my dataset first? Should I drop duplicates? I could run something like this before my groupby:
df = df[['ID','eventDate']].drop_duplicates().repartition(x)
or
df = df[['ID','eventDate']].repartition(x)
I'm struggling a little bit to figure out what will optimize the run-time. Any guidance on predetermining run-time would be greatly appreciated. I'd prefer not to just "test it out" if possible because I have a couple of these queries to run, and each will take a while.
Upvotes: 7
Views: 12313
Reputation: 1864
It might not be an answer you were looking for, but the most optimal code for this operation is exactly
output = df.groupby(['ID']). \
agg(F.min('eventDate').alias("firstDate"), F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet', mode='overwrite')
Spark optimises the process by only first selecting the necessary columns it needs for the entire operation. Afterwards Spark partitions your data by ID
and starts the aggregation process on each partition.
Allowing max number of executors will definitely help. I'd suggest (based on your description) setting spark.executor.instances=10; spark.executor.memory=10g
. 12m values is a fair amount, perhaps try boosting up the number of shuffle partitions, f.e. spark.sql.shuffle.partitions=400
, just so that you won't get some annoying memory overhead exceptions.
Upvotes: 6
Reputation: 19
@flyingmeatball,
Before doing aggregation do the following steps
1 - drop unwanted data (It will eat your resources).
2- Repartition and cache the data according to your data (It Will eliminate the execution time)
hint: If data is from Cassandra repartition the data by partition key so that it will avoid data shuffling
Now you are good to go for the aggregation logic ;)
Thanks,
Vimalesh
Upvotes: -2