Ammar
Ammar

Reputation: 45

Spark 12 GB data load with Window function Performance issue

I am using sparksql to transform 12 GB data.My Transformation is to apply row number function with partition by on one of fields then divide data into two sets first set where row number is 1 and 2nd set include rest of data then write data to target location in 30 partitions.

My job is currently taking approximately 1 hour.I want to run it in less than 10 mins.

I am running this job on 3 Node cluster with specs(16 Cores & 32 GB RAM). Node 1 yarn master node. Node 2 Two Executors 1 driver and 1 other Node 3 Two executors both for processing. Each executor is assigned 5 cores and 10GB memory.

Is my hardware enough or i need more powerful hardware? Is executors configuration right? If both hardware and configuration is good then definitely i need to improve my code.

My code is as follow.

sqlContext=SQLContext(sc)

SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()

SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))

initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')


initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')

sc.stop()

Thanks in advance for your help

Upvotes: 0

Views: 3685

Answers (2)

Ammar
Ammar

Reputation: 45

Following were the changes that we implemented to improve performance of our code.

We removed coalesce and used repartition(50).We tried higher and lower numbers in the brackets but 50 was the optimized number in our case. We were using s3 as our target but it was costing us alot because of rename thing in spark so we used HDFS instead and our job time was reduced to half of what it was before. Overall by above changes our code ran 12 mins previously it was 50 mins. Thanks Ammar

Upvotes: 1

Silvio
Silvio

Reputation: 4207

You have a coalesce(1) before writing, what is the reason for that? Coalesce reduces the parallelization of that stage which in your case will cause the windowing query to run on 1 core, so you're losing the benefit of the 16 cores per node.

Remove the coalesce and that should start improving things.

Upvotes: 4

Related Questions