Kaushik Ghosh
Kaushik Ghosh

Reputation: 131

Pyspark Performance Tuning while merging multiple part files

I am kind of new in spark and i have a requirement where i am required to read from different part folders and then merge them all together to create a single df based on a passed schema. it is something like this

/feed=abc -> contains multiple part folders based on date like below
/feed=abc/date=20221220
/feed=abc/date=20221221
.....
/feed=abc/date=20221231

Each part folder can have multiple part files. All the files are in parquet format but the schema across two different part folders may vary either in the number of cols or in datatype. So my approach is

1 - create an empty final_df based on the schema passed 2 - Iterate over the list of part folders using the below code

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(inp_fl_loc)
    for f in fs.get(conf).listStatus(path):
        path2 = str(hadoop.fs.Path(str(f.getPath())))
        if(f.isDirectory()):
            path2= path2 + "/"
            print("the inp_path is ",str(path2))
            #splitting the individual name to get the corresponding partition col name and value
            temp_path = path2.split("/")[-2]
            part_col,part_val = temp_path.split("=")[0],temp_path.split("=")[1]
        elif('_' in path2.split("/")[-1]):
            continue

        #reading the file
        df = 

spark.read.format(inp_fl_frmt).option("mergeSchema","true").load(str(path2))

#other operation follows :-

3 - Once a particular part folder is read, comparing the schema of the read_df with that of the final_df and selecting only the req cols and if required typecasting the req col of the read_df based on the final_df schema. Note in this process i might have to type cast a sub-col within a struct type variable as well. For that i am actually expanding the struct variables into new cols, type casting them and then again converting them back in the original structure. 4 - Unioning the typecasted read_df with the final_df. 5 - Repeat steps 3-4 for all the part folders ultimately giving me the final final_df

The thing is in the presence of large data (in one of my feed i am reading 340 part folders totalling around 13000 files close to around 7GB in total) the job is running for a large amount of time (7hrs+ in the above case). Since i am working on a shared cluster i dont have the exact details of the number of nodes and the number of cores and following the standard configuration used in our team...but seems like that is not enough. The above details are not yet handy but i am trying to get those but i am more concerned if any tuning is possible from the code perspective. Few questions that i have in mind :-

Any help regarding this is much appreciated.

Upvotes: 1

Views: 147

Answers (1)

Bartosz Gajda
Bartosz Gajda

Reputation: 1167

I think there are several considerations that impact the performance of your job:

  • the simple Python for loop is not distributing the work evenly across nodes - you are losing the benefit of running a distributed engine like Spark by overloading only one of the workers
  • your folder structure already seems quite nicely partitioned, so reading the data even with varying schemas shouldn't be that big of a problem
  • selecting and casting the columns would make most sense only after you have read all the required files - before that, you are risking of building large if-else spaghetti to handle every possible case

A simple solution: have you tried reading in all of the desired folders, by passing a whole directory to the Spark?

In general, when you have varying schemas, then sane solution is to have a separate DataFrame for group of files with distinct schema, and then use function like unionByName to combine them. You can pass allowMissingColumn to True, so that when for example DataFrame A doesn't have some columns of DataFrame B, after union, it will have a NULL values assigned there, instead of throwing an exception.

Try out any of the solutions, and let me know which one worked the best - always interested what works for people :)

Upvotes: 0

Related Questions