morfious902002
morfious902002

Reputation: 918

Slow Parquet write to HDFS using Spark

I am using Spark 1.6.1 and writing to HDFS. In some cases it seems like all the work is being done by one thread. Why is that?

Also, I need parquet.enable.summary-metadata to register the parquet files to Impala.

Df.write().partitionBy("COLUMN").parquet(outputFileLocation);

It also, seems like all of this happens in one cpu of a executor.

16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3  times so far)
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:

Then again :-

16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting.
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0  time so far)
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1  time so far)
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time.
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:

The Schema

About 200 of the following lines again and again 20 times or so.

16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk.
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651

About 200 of the following lines

16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp}

Then at last:-

16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver

Update: parquet.enable.summary-metadata set to false.
Reduced partitions to 21.

Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation);

It did improve speed but still takes an hour to complete.

Update :- The reason for most of the issue is multiple left outer join with very small data being materialized just before write. The spills are happening because of Append mode which keeps file open. The is default limit of 5 open files in this mode. You can increase this using property "spark.sql.sources.maxConcurrentWrites"

Upvotes: 7

Views: 8855

Answers (1)

morfious902002
morfious902002

Reputation: 918

Finally after some optimizations in the code before reaching the write part we got better write times. Before we could not do repartition as the shuffles were more than 4-5 Gb. After previous changes, I changed the code from coalesce to repartition which distributed the data across all executors there by giving each CPU in executors about the same amount of data to write. So, if you see that the parquet files created by your jobs vary in size than try to repartition your Dataframe before write.

Also, this can help with write performance too :-

sc.hadoopConfiguration.set("parquet.enable.dictionary", "false")

Upvotes: 2

Related Questions