born to hula
born to hula

Reputation: 1286

Writing DataFrame to Parquet or Delta Does not Seem to be Parallelized - Taking Too Long

Problem Statement

I've read a partitioned CSV file into a Spark Dataframe.

In order to leverage the improvements of Delta Tables I'm trying to simply export it as Delta in a directory inside an Azure Data Lake Storage Gen2. I'm using the code below in a Databricks notebook:

%scala

df_nyc_taxi.write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

The whole dataframe has around 160 GB.

Hardware Specs

I'm running this code using a cluster with 12 Cores and 42 GB of RAM.

However looks like the whole writing process is being handled by Spark/Databricks sequentially, e.g. non-parallel fashion:

enter image description here

The DAG Visualization looks like the following:

enter image description here

All in all looks like this will take 1-2 hours to execute.

Questions

Upvotes: 4

Views: 5420

Answers (2)

murtihash
murtihash

Reputation: 8410

This is similar to the other answer however, I have added a persist after repartition & before writing it. The persist will go into memory and rest(left over after memory is full) will spill to disk, which will still be faster than reading it again. It has worked well in the past for me. I chose 1250 partitions as 128mb is my usual go to for partition size. Spark became what it is because of in-memory computations, therefore it is a best practice to apply it whenever you have the chance.

from pyspark.sql import functions as F
df_nyc_taxi.repartition(1250,F.col("year"), col("month"))\
.persist(StorageLevel.MEMORY_AND_DISK).write.partitionBy("year", "month")\
.format("delta").save("/mnt/delta/")

Upvotes: 1

Long Vu
Long Vu

Reputation: 391

To follow up on @eliasah comment perhaps you can try this:

import org.apache.spark.sql.functions
df_nyc_taxi.repartition(col("year"), col("month"), lit(rand() * 200)).write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

The answer from @eliasah most likely will create only one file for each directory "/mnt/delta/year=XX/month=XX", and only one worker will write the data to each file. The extra columns will further slice the data (in this case I'm dividing the data in each original file to 200 smaller partitions, you can edit it if you like), so that more worker can write concurrently.

P.S: sry I don't have enough rep to comment yet :'D

Upvotes: 5

Related Questions