retyrety149
retyrety149

Reputation: 21

Writing a dataframe to disk taking an unrealistically long time in Pyspark (Spark 2.1.1)

I'm running Pyspark on a single server with multiple CPUs. All other operations (reading, joining, filtering, custom UDFs) are executed quickly except for writing to disk. The dataframe I'm trying to save is of size around ~400 gb with 200 partitions.

 sc.getConf().getAll()

The driver memory is 16g, and working directory has enough space (> 10TB)

I'm trying to save using the following command:

 df.repartition(1).write.csv("out.csv")

Wondering if anyone has run into the same issue. Also will changing any of the config parameters before pyspark is invoked help solve the issue?

Edits (a few clarifications):

When I mean other operations were executed quickly, there was always an action after transformation, in my case they were row counts. So all the operations were executed super fast. Still haven't gotten around why writing takes such a ridiculous amount of time.

One of my colleagues brought up the fact that the disks in our server might have a limit on concurrent writing which might be slowing things down, still investigating on this. Interested in knowing if others are seeing slow write times on a Spark cluster too. I have confirmation from one user regarding this on AWS cluster.

Upvotes: 1

Views: 10373

Answers (2)

retyrety149
retyrety149

Reputation: 21

After a lot of trial and error, I realized that the issue was due to the method I used to read the file from disk. I was using the in-built read.csv function, and when I switched over to the read function in databricks-csv package the problem went away. I'm now able to write files to disk at a reasonable time. It's really strange, maybe it's a bug in 2.1.1 or databricks csv package is really optimized.

1.read.csv method

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("model") \
    .config("spark.worker.dir", "xxxx") \
    .getOrCreate()
df = spark.read.load("file.csv", format="csv", header = True)
write.csv("file_after_processing.csv")

2.Using the databricks-csv package

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file.csv')
train.write.format('com.databricks.spark.csv').save('file_after_processing.csv')

Upvotes: 1

user9017304
user9017304

Reputation: 31

All other operations (reading, joining, filtering, custom UDFs)

There are because there are transformations - they don't do anything until data has to be saved.

The dataframe I'm trying to save is of size around ~400 gb (...) I'm trying to save using the following command:

df.repartition(1).write.csv("out.csv")

That just cannot work well. Even ignoring part where you use a single machine, saving 400GB with a single thread (!) is just hopeless. Even if it succeeds, it is not better than using plain bash script.

Skipping over Spark - sequential writes for 400GB will take a substantial amount of time, even on average size disk. And given multiple shuffle (join, repartition) data will be written to disk multiple times.

Upvotes: 3

Related Questions