Giannis Polyzos
Giannis Polyzos

Reputation: 41

Structured Streaming OOM

I deploy a structured streaming job with on the k8s operator, which simply reads from kafka, deserializes, adds 2 columns and stores the results in the datalake (tried both delta and parquet) and after days the executor increases memory and eventually i get OOM. The input records are in terms of kbs really low. P.s i use the exactly same code, but with cassandra as a sink which runs for almost a month now, without any issues. any ideas plz?

enter image description here

enter image description here

My code

spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
    .option("subscribe", MetisStreamsConfig.topics.head)
    .option("startingOffsets", startingOffsets)
    .option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]
    .withColumn("payload", from_json($"value", schema))

    // selection + filtering
    .select("payload.*")
    .select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
    .select($"qid", $"vessel_id", $"col.*")
    .filter($"timestamp".isNotNull)
    .filter($"qid".isNotNull and !($"qid"===""))
    .withColumn("ingestion_time", current_timestamp())
    .withColumn("mapping", MappingUDF($"qid"))
  writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      log.info(s"Storing batch with id: `$batchId`")
      val calendarInstance = Calendar.getInstance()

      val year = calendarInstance.get(Calendar.YEAR)
      val month = calendarInstance.get(Calendar.MONTH) + 1
      val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
      batchDF.write
        .mode("append")
        .parquet(streamOutputDir + s"/$year/$month/$day")
    }
    .option("checkpointLocation", checkpointDir)
    .start()

i changed to foreachBatch because using delta or parquet with partitionBy cause issues faster

Upvotes: 3

Views: 2068

Answers (2)

Boris
Boris

Reputation: 491

There is a bug that is resolved in Spark 3.1.0.

See https://github.com/apache/spark/pull/28904

Other ways of overcoming the issue & a credit for debugging:

https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read

You may find this helpful even though you are using foreachBatch ...

Upvotes: 1

btbbass
btbbass

Reputation: 145

I had the same issue for some Structured Streaming Spark 2.4.4 applications writing some Delta lake (or parquet) output with partitionBy.

Seem to be related to the jvm memory allocation within a container, as thorougly explained here: https://merikan.com/2019/04/jvm-in-a-container/

My solution (but depends on your jvm version) was to add some option in the yaml definition for my spark application:

spec:
    javaOptions: >-
        -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap

This way my Streamin App is functionning properly, with normal amount of memory (1GB for driver, 2GB for executors)

EDIT: while it seem that the first issue is solved (controller killing pods for memory consumption) there is still an issue with slowly growing non-heap memory size; after a few hours, the driver/executors are killed...

Upvotes: 0

Related Questions