Reputation: 365
The spark job has the correct functions and logic. However, after several hours running, it becomes slower and slower. Are there some pitfalls in the below code?
val query = "(select * from meta_table) as meta_data"
val meta_schema = new StructType()
.add("config_id", BooleanType)
.add("threshold", LongType)
var meta_df = spark.read.jdbc(url, query, connectionProperties)
var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
//rules_imsi_df: joining of kafka ingestion with the meta_df_explode
//rules_monitoring_df: static dataframe for monitoring purpose
val rules_monitoring_stream =
rules_imsi_df.writeStream
.outputMode("append")
.format("memory")
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch {
(batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty) {
printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
batchDF.show()
batchDF.persist() var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")
rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)
rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) batchDF.unpersist() } }.start()
while(rules_monitoring_stream.isActive) {
Thread.sleep(240000)
... //Periodically load meta data from database
meta_df = spark.read.jdbc(url, query, connectionProperties)
meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
}
Upvotes: 0
Views: 610
Reputation: 10382
Below points are my observation from the above code. I will add incase if I find anything.
format("memory")
as its for debugging purpose not for production use case.var
from the code.DataFrame
back to rules_monitoring_df
inside foreachBatch
as this foreachBatch
will be invoked by multiple threads at the same time, You might get wrong results. Instead try to save result into hdfs or hive table & read back whenever you need.spark streaming listener
if possible & avoid extra while
loop.while(rules_monitoring_stream.isActive) {
Thread.sleep(240000)
... //Periodically load meta data from database
meta_df = spark.read.jdbc(url, query, connectionProperties)
meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
}
Upvotes: 1