Reputation: 3542
I have many parquet file directories on HDFS that contain a few thousands of small(most < 100kb) parquet files each. They slow down my Spark job, so I want to combine them.
With the following code I can repartition the local parquet file to smaller number of parts:
val pqFile = sqlContext.read.parquet("file:/home/hadoop/data/file.parquet")
pqFile.coalesce(4).write.save("file:/home/hadoop/data/fileSmaller.parquet")
But I don't know how to get the size of a directory on HDFS through Scala code programmatically, hence I can't work out the number of partitions to pass to the coalesce
function for the real data set.
How can I do this? Or is there a convenient way within Spark so that I can configure the writer to write fixed size of parquet partition?
Upvotes: 6
Views: 13644
Reputation: 13008
You could try
pqFile.inputFiles.size
which returns "a best-effort snapshot of the files that compose this DataFrame" according to the documentation.
As an alternative, directly on the HDFS level:
val hdfs: org.apache.hadoop.fs.FileSystem =
org.apache.hadoop.fs.FileSystem.get(
new org.apache.hadoop.conf.Configuration())
val hadoopPath= new org.apache.hadoop.fs.Path("hdfs://localhost:9000/tmp")
val recursive = false
val ri = hdfs.listFiles(hadoopPath, recursive)
val it = new Iterator[org.apache.hadoop.fs.LocatedFileStatus]() {
override def hasNext = ri.hasNext
override def next() = ri.next()
}
// Materialize iterator
val files = it.toList
println(files.size)
println(files.map(_.getLen).sum)
This way you get the file sizes as well.
Upvotes: 8