Reputation: 761
When writing a file to HDFS using Spark, this is quite fast when not using partitioning. Instead of that, when I use partitioning for writing the file, the write delay increases by factor ~24.
For the same file, writing without partition takes around 600ms. Writing with partition by Id (will generate exactly 1.000 partitions, as there are 1.000 ids in the file) it takes around 14 seconds.
Do some of you have the same experience that writing a partitioned file takes very long time? What is the root cause of this, perhaps that Spark needs to create 1.000 folders and files for each partition? Do you have an idea how this can be speeded up?
val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) }
val df = myRdd.toDF
df.write.mode(SaveMode.Append)
.partitionBy("id")
.parquet(path)
Upvotes: 3
Views: 6450
Reputation: 624
Spark executors communicate with HDFS to write the data they have, it depends on how your data is spread across the cluster after partitioning.
Apparently for smaller chunks of data the time to establish the connections from multiple executor nodes to HDFS and write would be more, when compared to write the entire file sequentially.
How to avoid this:
By default spark partitions the data using Hash partitioner(hashes the key and the key with same hash goes to same node) try specifying the Range partitioner, please find the sample snippets below:
The following snippet uses Hash partitioner yourRdd.groupByKey().saveAsTextFile("HDFS PATH");
The following snippet uses our custom range partitioner
It creates 8 partitions as mentioned in RangePartitioner(8, yourRdd)
and writing through 8 connections would be a better choice then writing through 1000 connections.
val tunedPartitioner = new RangePartitioner(8, yourRdd)
val partitioned = yourRdd.partitionBy(tunedPartitioner).saveAsTextFile("HDFS PATH");
Again this is a trade-off between the data to write and the number of partitions you create.
Upvotes: 1