Reputation: 41
I deploy a structured streaming job with on the k8s operator, which simply reads from kafka, deserializes, adds 2 columns and stores the results in the datalake (tried both delta and parquet) and after days the executor increases memory and eventually i get OOM. The input records are in terms of kbs really low. P.s i use the exactly same code, but with cassandra as a sink which runs for almost a month now, without any issues. any ideas plz?
My code
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
.option("subscribe", MetisStreamsConfig.topics.head)
.option("startingOffsets", startingOffsets)
.option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.withColumn("payload", from_json($"value", schema))
// selection + filtering
.select("payload.*")
.select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
.select($"qid", $"vessel_id", $"col.*")
.filter($"timestamp".isNotNull)
.filter($"qid".isNotNull and !($"qid"===""))
.withColumn("ingestion_time", current_timestamp())
.withColumn("mapping", MappingUDF($"qid"))
writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
log.info(s"Storing batch with id: `$batchId`")
val calendarInstance = Calendar.getInstance()
val year = calendarInstance.get(Calendar.YEAR)
val month = calendarInstance.get(Calendar.MONTH) + 1
val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
batchDF.write
.mode("append")
.parquet(streamOutputDir + s"/$year/$month/$day")
}
.option("checkpointLocation", checkpointDir)
.start()
i changed to foreachBatch because using delta or parquet with partitionBy cause issues faster
Upvotes: 3
Views: 2068
Reputation: 491
There is a bug that is resolved in Spark 3.1.0.
See https://github.com/apache/spark/pull/28904
Other ways of overcoming the issue & a credit for debugging:
https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read
You may find this helpful even though you are using foreachBatch ...
Upvotes: 1
Reputation: 145
I had the same issue for some Structured Streaming Spark 2.4.4 applications writing some Delta lake (or parquet) output with partitionBy
.
Seem to be related to the jvm memory allocation within a container, as thorougly explained here: https://merikan.com/2019/04/jvm-in-a-container/
My solution (but depends on your jvm version) was to add some option in the yaml
definition for my spark application:
spec:
javaOptions: >-
-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap
This way my Streamin App is functionning properly, with normal amount of memory (1GB for driver, 2GB for executors)
EDIT: while it seem that the first issue is solved (controller killing pods for memory consumption) there is still an issue with slowly growing non-heap memory size; after a few hours, the driver/executors are killed...
Upvotes: 0