arohland
arohland

Reputation: 116

Understand Databricks Structured Streaming Spill to Disk Behavior

I am a running streaming pipeline on Databricks using Pyspark (128gb Memory Cluster with DBR 14.3, Spark 3.5.0). This stream is processing zipped json files and merging them into a delta table. For this Pipeline the data has about 20 columns, not containing any memory heavy content.

We are receiving the changed files from a Azure queue that alerts us of new files being added. These are processed using Auto Loader combined with a glob pattern. We are running with trigger(processingTime="10 minutes" to regularily check for new files and merge them into our table. For debugging I have set the stream to process only a single file per batch.

We are writing the stream using foreachBatch to do some transformations on each batch (adding columns, deduplicating, grouping by Index columns, then merging the batch into delta table) and merge the data into a target delta table. Matched records get two columns updated and notmatched records get inserted into the table.

The problem: I left the stream to run for a few days and it is processing the arriving files well. But each batch increases the consumed memory and also increases the used filesystem space. After a few days more than 500gb of filesystem space was used. This caused our streams to crash after a few days of running. I am trying to find out what is causing Spark to require more and more memory and load up the filesystem.

Looking at Databricks Spark UI I found that several hundred GB of data were written to IO Cache. But it seems to me that this is way more data than was processed by the pipeline. And it does not get released again.

Any recommendations what settings to change to keep our pipeline from crashing?

Upvotes: 0

Views: 80

Answers (1)

JayashankarGS
JayashankarGS

Reputation: 8160

Disk IO cache stores create a copy of your data files locally in nodes local storage. This improves the reading speed.

But these files removed in LRU (Least Recently Used) fashion or on any file change, manually when restarting a cluster.

To enable and disable the disk cache, run:

spark.conf.set("spark.databricks.io.cache.enabled",  "[true | false]")

Disabling is not recommended as it impact performance while reading data, what you can do is set the configuration of disk cache.

spark.databricks.io.cache.maxDiskUsage 50g
spark.databricks.io.cache.maxMetaDataCache 1g
spark.databricks.io.cache.compression.enabled false

Refer more about disk caching here.

Additionally, you check below things.

  1. Manually unpersist DataFrames after processing in ForEachBatch
def process_batch(df, batch_id):
    transformed_df = df.withColumn("new_col", ...)  # Your transformations
    transformed_df.write.format("delta").mode("append").saveAsTable("your_table")
    
    transformed_df.unpersist()
  1. Don't do caching on unnecessary dataframes
  2. Optimize your target delta table.
  3. Do vaccum which removes unwanted/unused files.

Upvotes: 1

Related Questions