Omkar Puttagunta
Omkar Puttagunta

Reputation: 4156

Spark Dataframe Write to CSV creates _temporary directory file in Standalone Cluster Mode

I am running spark job in a cluster which has 2 worker nodes! I am using the code below (spark java) for saving the computed dataframe as csv to worker nodes.

dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath); I am trying to understand how spark writes multiple part files on each worker node.

Run1) worker1 has part files and SUCCESS ; worker2 has _temporarty/task*/part* each task has the part files run.

Run2) worker1 has part files and also _temporary directory; worker2 has multiple part files

Can anyone help me understand why is this behavior? 1)Should I consider the records in outputDir/_temporary as part of the output file along with the part files in outputDir?

2)Is _temporary dir supposed to be deleted after job run and move the part files to outputDir?

3)why can't it create part files directly under ouput dir?

coalesce(1) and repartition(1) cannot be the option since the outputDir file itself will be around 500GB

Spark 2.0.2. 2.1.3 and Java 8, no HDFS

Upvotes: 7

Views: 8954

Answers (3)

Omkar Puttagunta
Omkar Puttagunta

Reputation: 4156

After analysis, observed that my spark job is using fileoutputcommitter version 1 which is default. Then I included config to use fileoutputcommitter version 2 instead of version 1 and tested in 10 node spark standalone cluster in AWS. All part-* files are generated directly under outputDirPath specified in the dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)

We can set the property

  1. By including the same as --conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2' in spark-submit command

  2. or set the property using sparkContext javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")

I understand the consequence in case of failures as outlined in the spark docs, but I achieved the desired result!

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version, defaultValue is 1
The file output committer algorithm version, valid algorithm version number: 1 or 2. Version 2 may have better performance, but version 1 may handle failures better in certain situations, as per MAPREDUCE-4815.

Upvotes: 7

zero323
zero323

Reputation: 330303

TL;DR To properly write (or read for that matter) data using file system based source you'll need a shared storage.

_temporary directory is a part of basic commit mechanism used by Spark - data is first written to a temporary directory, and once all task finished, atomically moved to the final destination. You can read more about this process in Spark _temporary creation reason

For this process to be successful you need a shared file system (HDFS, NFS, and so on) or equivalent distributed storage (like S3). Since you don't have one, failure to clean temporary state is expected - Saving dataframe to local file system results in empty results.

The behavior you observed (data partially committed and partially not) can occur, when some executors are co-located with the driver and share file system with the driver, enabling full commit for the subset of data.

Upvotes: 4

Karthick
Karthick

Reputation: 662

Multiple part files are based on your dataframe partition. The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data.

you can control it by using coalesce or repartition. you can reduce the partition or increase it.

if you make coalesce of 1, then you wont see multiple part files in it but this affects writing Data in Parallel.

[outputDirPath = /tmp/multiple.csv ]

dataframe
 .coalesce(1)
 .write.option("header","false")
 .mode(SaveMode.Overwrite)
 .csv(outputDirPath);

on your question on how to refer it..

refer as /tmp/multiple.csv for all below parts.

/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv

Upvotes: 2

Related Questions