Minh Thai
Minh Thai

Reputation: 578

Spark slow repartitioning many small files

I'm trying to read a folder consists of many small parquet files: 600 files, 500KB each. And then repartition them into 2 files.

val df = spark.read.parquet("folder")
df.repartition(2).write.mode("overwrite").parquet("output_folder")

And this is horribly slow, up to 10 minutes. From spark UI I can see 2 executors are handling 2 tasks. I give each executor 10GB of memory.

enter image description here

So what is the reason for the slow speed? Is it because of disk IO? And how can I improve the performance in this case.

Edit: I also tried using coalesce and the performance doesn't look different.

Upvotes: 0

Views: 4109

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29185

First option is to make a big file out of small parquet files at source level is to merge them together as multiple files > 128 mb sized files or what ever size you wanted

how to merge multiple parquet files to single parquet file using linux or hdfs command?

Second option i.e. Using spark : read small parquet files and then before actual data business processing logic using spark and write them in to relatively big sized files as expected by you (by taking performance factors in to consideration)


Second option :

Even though what is your spark job configuration I am not aware... But in general coalesce should work.... try like below example (master -> local but change it to yarn for your app) which worked for me. here in this example I took small files "./userdata*.parquet" (5 small files all around 110 KB) under src/main/resources and merged in to final 2 files with coalesce...

Approach : Read each parquet file as a dataframe and then union to make single dataframe and then coalesce it.

  package com.examples

import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.mutable

/** *
  * take small pegs and make a large peg
  * and coalesce it
  *
  * @author : Ram Ghadiyaram
  */
object ParquetPlay extends Logging {
  Logger.getLogger("org").setLevel(Level.OFF)


  //public FileStatus[] globStatus(Path pathPattern) throws IOException
  def main(args: Array[String]): Unit = {


 val appName = if (args.length >0) args(0) else this.getClass.getName
    val spark: SparkSession = SparkSession.builder
      .config("spark.master", "local")
      .appName(appName)
      .getOrCreate()
    val fs = FileSystem.get(new Configuration())

    val files = fs.globStatus(new Path("./userdata*.parquet")).map(_.getPath.toString)
    val dfSeq = mutable.MutableList[DataFrame]()
    println(dfSeq)
    println(files.length)
    files.foreach(x => println(x))
    val newDFs = files.map(dir => {
      dfSeq += spark.read.parquet(dir).toDF()
    })
    println(dfSeq.length)
    val finalDF = dfSeq.reduce(_ union _)
      .toDF
    finalDF.show(false)
    println(System.getProperty("java.io.tmpdir"))
    println(System.getProperties.toString)
    finalDF.coalesce(2)
      .write
      .mode(SaveMode.Overwrite)
      .parquet(s"${System.getProperty("java.io.tmpdir")}/final.parquet")
    println("done")
  }
}

Result : Nearly equal sized 2 files like below... here in the example again it generated small files but in your case since you have 500KB size and around 600 files you can see the size of the file and you can decide coalesce(number of partitions of your expectation)

enter image description here

Third Option : As Minh (original poster) mentioned in the comment ... there might be big files which are highly compressed which become small after compression might be causing this.

Upvotes: 0

This is a trade-off Spark currently has (and release 3.0 should solve it) as the number of tasks should implicate in a 1x1 mapping to the number of files ... so the greater the number of tasks the better for performance but really not ideal from a partitioning perspective as files might be quite small in that case.

Another issue is that most of the time , the final repartitioned dataset will grow in volume as the compression algorithms no longer have information about keys. For real life Big Data that is a major problem as the disk space occupancy will grow considerably. That is true particularly for very nested datasets.

A solution for that is to flatten down the datasets in simple schemas so that we can take advantage of compression algorithms everytime we write out to disk.

hope that helps !

Upvotes: 0

Related Questions