Reputation: 81
I've just developed a Spark SQL Applications and during some algorithm analysis, I realized that execution plan takes a significant time for processing. How can I optimize the performance of the execution plan for Spark SQL?
I took a look at several questions/answers about this in our community, but nothing appeared to me straitgh to the point for this implementaion though. So, I would like some community support to overcome my blocking points and perhaps leave a roadmap to further developers on this way.
Here is below some details about the effort done.
I have developed a Spark application that periodically ingests events from kafka, process it and sent the output back to kafka again. The Spark algorithm in a nutshell, filters/enrich information and for each event, perform heavy and complex windowing lag functions.
The Spark algorithm runs in a loop, so each one run based on the number of events that it has to process (Kafka retention of 30m
). At present, it takes roughly ~90s
for each execution cycle time, that it is running a loop in batch mode as described below:
70
Spark SQLsSince each cycle takes around 90s, it means that kafka events can take from 90s
to 180s to be processed. I must decrease this processing time to 60s
.
IMHO, I could scale spark hardware looking for a better SQL performance in batch mode, but since I identified the major part of algorithm processing is only to create a execution plan, I am wondering know what can be done about execution plan to definitely decrease the processing time significantly.
It is currently running on Spark 3.0.1
in a standalone configuration of a 20
vCore server and 32GB
RAM. Here is a code sample to support on this question. Hope this explain the scenario.
streamdata = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_servers).\
option("subscribe", readtopic).option("failOnDataLoss", "false").\
option("startingOffsets", "earliest").\
option("endingOffsets", "latest").load()
#Besides columns identified bellow, we will bring from kafka : offset (auto number) and timestamp (insert datetime)
streamdata = streamdata.withColumn('value', streamdata['value'].cast('string')).drop('key','topic','partition','timestampType')
streamdata = streamdata.withColumn("LAST_UPDATE", split(col("value"), ",").getItem(0).cast(IntegerType()))\
.withColumn("DESCRIPTION", split(col("value"), ",").getItem(1).cast(StringType()))
.withColumn("MYTAG", split(col("value"), ",").getItem(6).cast(StringType()))
SUM_COUNT = spark.sql("""
SELECT FIELD_A, FIELD_B, sum(FIELD_C) OVER (PARTITION BY FIELD_D ORDER BY CAST(LAST_UPDATE as timestamp) RANGE BETWEEN INTERVAL 12 HOURS PRECEDING AND CURRENT ROW) as FIELD_COUNT_12h
FROM streamdata
""")
#create value as a concat json of all columns
SendKafka = query_03.withColumn("value", to_json(struct([query_03[x] for x in query_03.columns])))
#Send back to kafka
SendKafka.write.format("kafka").option("kafka.bootstrap.servers", kafka_servers).option("topic", writetopic).save()
Upvotes: 4
Views: 1427
Reputation: 87164
It's hard to say without looking into actual execution plan, using only code. Start with tuning spark.sql.shuffle.partitions
- set it to the number of cores available for Spark job - if you have 20 cores, and you use default value that is equal to 200, then it means that after fist shuffle, each core will execute the code 10 times (200/20), one after previous (in Spark 3 that should be less of issue because of the adaptive query execution). Also, take into account that Spark reads data from Kafka based on number of partitions in the topic, so if you have less partitions than cores, your cores would be idle when reading - check the minPartitions
option of Kafka connector (see documentation)
Also, check recommendations in the Spark SQL tuning guide, and Spark Tuning guide.
Upvotes: 2