varun
varun

Reputation: 41

spark - java heap out of memory when doing groupby and aggregation on a large dataframe

I'm new to spark and have no programming experience in Java. I'm using pyspark to process a very large time-series dataset with close to 4000 numeric (float) columns and billions of rows.

What I want to achieve with this dataset is the following:

The time-series data is at 10ms intervals. I want to group the data by 1s intervals and use mean as the aggregation function.

Here is the code that I'm using to read the partitioned parquet files.

df = (spark.read.option("mergeSchema", "true")
           .parquet("/data/"))

Here is the piece of code for groupby and aggregation that I wrote:

col_list = [... list of numeric columns in the dataframe ...]

agg_funcs = [mean]   # I also want to add other aggregation functions here later.

exprs     = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]

result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
            .agg(*exprs))

Now, I want to write the above result dataframe to a partitioned parquet:

(result.write.mode("overwrite")
       .partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
       .parquet('/out/'))

But, I get a java heap out of memory error.

I tried increasing spark.sql.shuffle.partitions so that each partition will be of a smaller size, but that didn't help.

My spark cluster configuration:

2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.

The configuration I'm specifying for my spark job is:

{
    "driverMemory": "8G", 
    "driverCores": 4, 
    "executorMemory": "20G", 
    "executorCores": 4, 
    "numExecutors": 14, 
    "conf": {
        "spark.sql.shuffle.partitions": 2000000
    }
}

Following are some screenshots from Ambari regarding the configurations of the cluster:

YARN memory

YARN CPU

Can somebody please help me understand why there is a memory issue and how to fix it? Thanks.

Upvotes: 2

Views: 5237

Answers (2)

firsni
firsni

Reputation: 906

why don't you concatenate 'Year', 'Month', 'Day', 'Hour', 'Minute', 'Second' before doing the groupBy. After the groupBy you can recreate these columns. I think try without changing executor-cores and then reduce it to 15 instead and then to 7. 4 is too low I think

Upvotes: 0

Mayank Pande
Mayank Pande

Reputation: 21

I believe this is happening because of data skew and one of your partitions is getting OOM.

Spark’s groupBy() requires loading all of the key values into memory at once to perform groupby.

Increasing partitions is not working because you might be having large data with similar group by key. Check if you have data skew with a similar group by key.

Check this article which explains this better.

Upvotes: 2

Related Questions