Reputation: 578
I'm trying to read a folder consists of many small parquet files: 600 files, 500KB each. And then repartition
them into 2 files.
val df = spark.read.parquet("folder")
df.repartition(2).write.mode("overwrite").parquet("output_folder")
And this is horribly slow, up to 10 minutes. From spark UI I can see 2 executors are handling 2 tasks. I give each executor 10GB of memory.
So what is the reason for the slow speed? Is it because of disk IO? And how can I improve the performance in this case.
Edit: I also tried using coalesce
and the performance doesn't look different.
Upvotes: 0
Views: 4109
Reputation: 29185
First option is to make a big file out of small parquet files at source level is to merge them together as multiple files > 128 mb sized files or what ever size you wanted
how to merge multiple parquet files to single parquet file using linux or hdfs command?
Second option i.e. Using spark : read small parquet files and then before actual data business processing logic using spark and write them in to relatively big sized files as expected by you (by taking performance factors in to consideration)
Second option :
Even though what is your spark job configuration I am not aware... But in general coalesce
should work.... try like below example (master -> local but change it to yarn for your app) which worked for me.
here in this example I took small files "./userdata*.parquet" (5 small files all around 110 KB) under src/main/resources
and merged in to final 2 files with coalesce
...
Approach : Read each parquet file as a dataframe and then union to make single dataframe and then coalesce
it.
package com.examples
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.collection.mutable
/** *
* take small pegs and make a large peg
* and coalesce it
*
* @author : Ram Ghadiyaram
*/
object ParquetPlay extends Logging {
Logger.getLogger("org").setLevel(Level.OFF)
//public FileStatus[] globStatus(Path pathPattern) throws IOException
def main(args: Array[String]): Unit = {
val appName = if (args.length >0) args(0) else this.getClass.getName
val spark: SparkSession = SparkSession.builder
.config("spark.master", "local")
.appName(appName)
.getOrCreate()
val fs = FileSystem.get(new Configuration())
val files = fs.globStatus(new Path("./userdata*.parquet")).map(_.getPath.toString)
val dfSeq = mutable.MutableList[DataFrame]()
println(dfSeq)
println(files.length)
files.foreach(x => println(x))
val newDFs = files.map(dir => {
dfSeq += spark.read.parquet(dir).toDF()
})
println(dfSeq.length)
val finalDF = dfSeq.reduce(_ union _)
.toDF
finalDF.show(false)
println(System.getProperty("java.io.tmpdir"))
println(System.getProperties.toString)
finalDF.coalesce(2)
.write
.mode(SaveMode.Overwrite)
.parquet(s"${System.getProperty("java.io.tmpdir")}/final.parquet")
println("done")
}
}
Result : Nearly equal sized 2 files like below... here in the example again it generated small files but in your case since you have 500KB size and around 600 files you can see the size of the file and you can decide
coalesce
(number of partitions of your expectation)
Third Option : As Minh (original poster) mentioned in the comment ... there might be big files which are highly compressed which become small after compression might be causing this.
Upvotes: 0
Reputation: 388
This is a trade-off Spark currently has (and release 3.0 should solve it) as the number of tasks should implicate in a 1x1 mapping to the number of files ... so the greater the number of tasks the better for performance but really not ideal from a partitioning perspective as files might be quite small in that case.
Another issue is that most of the time , the final repartitioned dataset will grow in volume as the compression algorithms no longer have information about keys. For real life Big Data that is a major problem as the disk space occupancy will grow considerably. That is true particularly for very nested datasets.
A solution for that is to flatten down the datasets in simple schemas so that we can take advantage of compression algorithms everytime we write out to disk.
hope that helps !
Upvotes: 0