Reputation: 2745
Databricks spark cluster can auto-scale as per load.
I am reading gzip files in spark and doing repartitoning on the rdd to get parallelism as for gzip file it will be read on signle core and generate rdd with one partition.
As per this post ideal number of partitions is the number of cores in the cluster which I can set during repartitioning but in case of auto-scale cluster this number will vary as per the state of cluster and how many executors are there in it.
So, What should be the partitioning logic for an auto scalable spark cluster?
EDIT 1:
The folder is ever growing, gzip files keep coming periodically in it, the size of gzip file is ~10GB & uncompressed size is ~150GB. I know that multiple files can be read in parallel. But for a single super large file databricks may try to auto scale the cluster however even though after scaling the cores in cluster have increased, my dataframe would have less number of partitions (based on previous state of cluster where it may be having lesser cores).
Even though my cluster will auto scale(scale out), the processing will be limited to number of partitions which I do by
num_partitions = <cluster cores before scaling>
df.repartition(num_partitions)
Upvotes: 0
Views: 947
Reputation: 1126
A standard gzip file is not splittable, so Spark will handle the gzip file with just a single core, a single task, no matter what your settings are [As of Spark 2.4.5/3.0]. Hopefully the world will move to bzip2 or other splittable compression techniques when creating large files.
If you directly write the data out to Parquet, you will end up with a single, splittable parquet file. This will be written out by a single core. If stuck with the default gzip codec, would be better to re-partition after the read, and write out multiple parquet files.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("a",IntegerType(),True),
StructField("b",DoubleType(),True),
StructField("c",DoubleType(),True)])
input_path = "s3a://mybucket/2G_large_csv_gzipped/onebillionrows.csv.gz"
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
df_two = spark.read.format("csv").schema(schema).load(input_path)
df_two.repartition(32).write.format("parquet").mode("overwrite").save("dbfs:/tmp/spark_gunzip_default_remove_me")
I very recently found, and initial tests are very promising, a splittable gzip codec. This codec actually reads the file multiple times, and each task scans ahead by some number of bytes (w/o decompressing) then starts the decompression. The benefits of this pay off when it comes time to write the dataframe out as a parquet file. You will end up with multiple files, all written in parallel, for greater throughput and shorter wall clock time (your CPU hours will be higher).
Reference: https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md
My test case:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
StructField("a",IntegerType(),True),
StructField("b",DoubleType(),True),
StructField("c",DoubleType(),True)])
input_path = "s3a://mybucket/2G_large_csv_gzipped/onebillionrows.csv.gz"
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
df_gz_codec = (spark.read
.option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
.schema(schema)
.csv(input_path)
)
df_gz_codec.write.format("parquet").save("dbfs:/tmp/gunzip_to_parquet_remove_me")
Upvotes: 1
Reputation: 2178
For a splittable file/data the partitions will be mostly created automatically depending on cores, operation being narrow or wide, file size etc. Partitions can also be controlled programmatically using coalesce
and repartition
. But for a gzip/un-splittable file there will be just 1 task for a file and it can be as many parallel as many cores available (like you said).
For dynamic cluster one option you have is to point your job to a folder/bucket containing large number of gzip files. Say you have 1000 files to process and you have 10 cores then 10 will in parallel. When dynamically your cluster increases to 20 then 20 will run in parallel. This happens automatically and you needn't code for this. The only catch is that you can't scale fewer files than the available cores. This is a known deficiency of un-splittable files.
The other option would be to define the cluster size for the job based the number and size of files available. You can find an emparical formula based on the historical run time. Say you have 5 large files and 10 small files (half size of large) then you may assign 20 cores (10 + 2*5) to efficiently use the cluster resources.
Upvotes: 1