DanG
DanG

Reputation: 741

Improve data wrangling performance in Spark.SQL

I have a big database containing several csv files. Each csv file contains the last 10 days and only the oldest date is final data.

For example "file_2019-08-11.csv" file contains data from 08-02 until 08-11 ( only records with date 08-02 in the data are final) and "file_2019-08-12.csv" file contains data from 08-03 until 08-12 ( only records with date 08-03 are final).

I am using PySpark to do that. My aim is to keep only records for date 08-02 from variables_2019-08-11.csv file and records for date 08-03 from variables_2019-08-12.csv file and so on. I am using PySpark and Databricks to do that , my snippet is working but is a bit slow , although I am running it on big enough cluster.

I would gladly take suggestions for other scenarios to improve its performance. Thanks

    import datetime
    # define the period range
    start_date="2019-08-12"
    end_date="2019-08-30



# create list of dates under date_generated variable

    start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]

# read first file

    filename="file_variables_"+str(date_generated[0])[0:10]+".csv"
    df=spark.read.csv(data_path+filename,header="true")
    df.createOrReplaceTempView("df")

#create the main file which we will use the other dates to append below this one

    final=spark.sql("select * from df where data_date in (select min(data_date) from df)")

#loop on other dates than the first date 

    for date in date_generated[1:len(date_generated)]:
      filename="file_variables_"+str(date)[0:10]+".csv"
      df=spark.read.csv(data_path+filename,header="true")
      df.createOrReplaceTempView("df")
      temp=spark.sql("select * from df where data_date in (select min(data_date) from df)")
      final=final.union(temp)
    final.createOrReplaceTempView("final")

Upvotes: 1

Views: 136

Answers (1)

Douglas M
Douglas M

Reputation: 1126

I suspect most of the cores on your big cluster are idle because based on the way your code is structured using a loop over each file, your job is processing one file and only using one core in your cluster. Look at clusters -> [Your Cluster] -> Metrics -> [Ganglia UI]

First, it's best to process all of your files as one set. Use input_file_name() if your logic is dependent on the input file name. Do all of your work on the set. Loops will kill your performance.

Second, I think a windowed SQL function dense_rank() will help you with finding the first date of all of the dates in your group [input_file_name()]. Here's a blog introducing window functions: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

df=spark.read.csv(data_path)

from pyspark.sql.functions import input_file_name
df2 = df.withColumn('file_name',input_file_name())

final = df2.<apply logic>

Upvotes: 1

Related Questions