Reputation: 57
I have millions of Gzipped files to process and converting to Parquet. I'm running a simple Spark batch job on EMR to do the conversion, and giving it a couple million files at a time to convert.
However, I've noticed that there is a big delay from when the job starts to when the files are listed and split up into a batch for the executors to do the conversion. From what I have read and understood, the scheduler has to get the metadata for those files, and schedule those tasks. However, I've noticed that this step is taking 15-20 minutes for a million files to split up into tasks for a batch. Even though the actual task of listing the files and doing the conversion only takes 15 minutes with my cluster of instances, the overall job takes over 30 minutes. It appears that it takes a lot of time for the driver to index all the files to split up into tasks. Is there any way to increase parallelism for this initial stage of indexing files and splitting up tasks for a batch?
I've tried tinkering with and increasing spark.driver.cores
thinking that it would increase parallelism, but it doesn't seem to have an effect.
Upvotes: 2
Views: 2533
Reputation: 843
This is a common problem with spark (and other big data tools) as it uses only on driver to list all files from the source (S3) and their path. Some more info here
I have found this article really helpful to solve this issue. Instead of using spark to list and get metadata of files we can use PureTools to create a parallelized rdd of the files and pass that to spark for processing.
S3 Specific Solution
If you don not want to install and setup tools as in the guide above you can also use a S3 manifest file to list all the files present in a bucket and iterate over the files using rdds in parallel.
Steps for S3 Manifest Solution
# Create RDD from list of files
pathRdd = sc.parallelize([file1,file2,file3,.......,file100])
# Create a function which reads the data of file
def s3_path_to_data(path):
# Get data from s3
# return the data in whichever format you like i.e. String, array of String etc.
# Call flatMap on the pathRdd
dataRdd = pathRdd.flatMap(s3_path_to_data)
Details
Spark will create a pathRdd with default number of partitions. Then call the s3_path_to_data function on each partition's rows in parallel. Partitions play an important role in spark parallelism. e.g. If you have 4 executors and 2 partitions then only 2 executors will do the work. You can play around num of partitions and num of executors to achieve the best performance according to your use case.
Following are some useful attributes you can use to get insights on your df or rdd specs to fine tune spark parameters.
rdd.getNumPartitions
rdd.partitions.length
rdd.partitions.size
Upvotes: 1
Reputation: 31
you can try by setting below config
spark.conf.set("spark.default.parallelism",x)
where x = total_nodes_in_cluster * (total_core_in_node - 1 ) * 5
Upvotes: 1