Lavor
Lavor

Reputation: 81

How to decrease total timing processing of Spark SQL Execution plan

I've just developed a Spark SQL Applications and during some algorithm analysis, I realized that execution plan takes a significant time for processing. How can I optimize the performance of the execution plan for Spark SQL?

I took a look at several questions/answers about this in our community, but nothing appeared to me straitgh to the point for this implementaion though. So, I would like some community support to overcome my blocking points and perhaps leave a roadmap to further developers on this way.

Here is below some details about the effort done.

I have developed a Spark application that periodically ingests events from kafka, process it and sent the output back to kafka again. The Spark algorithm in a nutshell, filters/enrich information and for each event, perform heavy and complex windowing lag functions.

The Spark algorithm runs in a loop, so each one run based on the number of events that it has to process (Kafka retention of 30m). At present, it takes roughly ~90s for each execution cycle time, that it is running a loop in batch mode as described below:

  1. Get events from Kafka input topic
  2. Process around 70 Spark SQLs
  3. Send output back to Kafka output topic

Since each cycle takes around 90s, it means that kafka events can take from 90s to 180s to be processed. I must decrease this processing time to 60s.

IMHO, I could scale spark hardware looking for a better SQL performance in batch mode, but since I identified the major part of algorithm processing is only to create a execution plan, I am wondering know what can be done about execution plan to definitely decrease the processing time significantly.

It is currently running on Spark 3.0.1 in a standalone configuration of a 20 vCore server and 32GB RAM. Here is a code sample to support on this question. Hope this explain the scenario.

Code sample

1. Get Kafka Topic

streamdata = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_servers).\ 
                                                option("subscribe", readtopic).option("failOnDataLoss", "false").\ 
                                                option("startingOffsets", "earliest").\ 
                                                option("endingOffsets", "latest").load() 

 
#Besides columns identified bellow, we will bring from kafka : offset (auto number) and timestamp (insert datetime) 
streamdata = streamdata.withColumn('value', streamdata['value'].cast('string')).drop('key','topic','partition','timestampType') 

streamdata = streamdata.withColumn("LAST_UPDATE", split(col("value"), ",").getItem(0).cast(IntegerType()))\ 
                .withColumn("DESCRIPTION", split(col("value"), ",").getItem(1).cast(StringType())) 
.withColumn("MYTAG", split(col("value"), ",").getItem(6).cast(StringType())) 

2. Window function processing (SEVE$RAL QUERIES LIKE THAT OR MORE COMPLEX)

SUM_COUNT = spark.sql(""" 
    SELECT FIELD_A, FIELD_B, sum(FIELD_C) OVER (PARTITION BY FIELD_D ORDER BY CAST(LAST_UPDATE as timestamp) RANGE BETWEEN INTERVAL 12 HOURS PRECEDING AND CURRENT ROW) as FIELD_COUNT_12h  
    FROM streamdata 
""") 

3. Send data back to Kafka

#create value as a concat json of all columns 
SendKafka = query_03.withColumn("value", to_json(struct([query_03[x] for x in query_03.columns]))) 

#Send back to kafka 
SendKafka.write.format("kafka").option("kafka.bootstrap.servers", kafka_servers).option("topic", writetopic).save() 

Upvotes: 4

Views: 1427

Answers (1)

Alex Ott
Alex Ott

Reputation: 87164

It's hard to say without looking into actual execution plan, using only code. Start with tuning spark.sql.shuffle.partitions - set it to the number of cores available for Spark job - if you have 20 cores, and you use default value that is equal to 200, then it means that after fist shuffle, each core will execute the code 10 times (200/20), one after previous (in Spark 3 that should be less of issue because of the adaptive query execution). Also, take into account that Spark reads data from Kafka based on number of partitions in the topic, so if you have less partitions than cores, your cores would be idle when reading - check the minPartitions option of Kafka connector (see documentation)

Also, check recommendations in the Spark SQL tuning guide, and Spark Tuning guide.

Upvotes: 2

Related Questions