mohsen
mohsen

Reputation: 93

Databricks SparkException exceeding spark.driver.maxResultSize

I am running the below code on Azure Databricks DBR 7.3 LTS, spark 3.0.1, scala 2.12 On a cluster of (20 to 35) workers of Standard_E4as_v4 (32.0 GB Memory, 4 Cores, 1 DBU) VMs And Driver of type Standard_DS5_v2 (56.0 GB Memory, 16 Cores, 3 DBU)

The aim is to process ~5.5 TB of data

I am facing the following exception: "org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1165 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB" After processing 1163 out of 57071, with 148.4 GiB being processed of the data, in 6.1 min

I don't do a collect or transfer data to the driver, does partitioning data causes this issue? If this is the case:

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)

val jsonDF = spark.read.json("/mnt/myfile")

val res = jsonDF
      .withColumn("row", row_number.over(w))
      .where($"row" === 1)
      .drop("row")

res.write.json("/mnt/myfile/spark_output")

I then have tried only to load and write data again with no transformation, and faced same issue, code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val jsonDF = spark.read.json("/mnt/myfile")

jsonDF.write.json("/mnt/myfile/spark_output")

Upvotes: 0

Views: 5182

Answers (1)

badger
badger

Reputation: 3246

the write method sends the result of the writing operation for all partitions back to the Driver and due to the large volume of data (and many partitions) this exception occurred, try to increase the spark.driver.maxResultSize to see it works.

from documentation:

Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.

this is useful also.

Upvotes: 5

Related Questions