yyuankm
yyuankm

Reputation: 365

Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

The spark job has the correct functions and logic. However, after several hours running, it becomes slower and slower. Are there some pitfalls in the below code?

val query = "(select * from meta_table) as meta_data"    
val meta_schema = new StructType()         
       .add("config_id", BooleanType)         
       .add("threshold", LongType)         
var meta_df = spark.read.jdbc(url, query, connectionProperties)         
var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")  

//rules_imsi_df: joining of kafka ingestion with the meta_df_explode 

//rules_monitoring_df: static dataframe for monitoring purpose   

val rules_monitoring_stream =        
            rules_imsi_df.writeStream           
                        .outputMode("append")  
                          .format("memory")
                        .trigger(Trigger.ProcessingTime("120  seconds"))
                         .foreachBatch {                  
                              (batchDF: DataFrame, batchId: Long) =>
                                    if(!batchDF.isEmpty)                                                     {    

printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())                                                             
batchDF.show()                                                            
 batchDF.persist()                                                                var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")                  
rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)                         
rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))                                                               batchDF.unpersist()                                                                          }                                          }.start()    


      while(rules_monitoring_stream.isActive)    {                  
Thread.sleep(240000)                      
... //Periodically load meta data from database          
meta_df = spark.read.jdbc(url, query, connectionProperties)              
meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")         
} 

Upvotes: 0

Views: 610

Answers (1)

s.polam
s.polam

Reputation: 10382

Below points are my observation from the above code. I will add incase if I find anything.

  1. Remove format("memory") as its for debugging purpose not for production use case.
  2. Remove all var from the code.
  3. Don't assign DataFrame back to rules_monitoring_df inside foreachBatch as this foreachBatch will be invoked by multiple threads at the same time, You might get wrong results. Instead try to save result into hdfs or hive table & read back whenever you need.
  4. Try to move below code to spark streaming listener if possible & avoid extra while loop.
while(rules_monitoring_stream.isActive)    {                  
Thread.sleep(240000)                      
... //Periodically load meta data from database          
meta_df = spark.read.jdbc(url, query, connectionProperties)              
meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")         
} 

Upvotes: 1

Related Questions