Shayan Salehian
Shayan Salehian

Reputation: 149

How does Spark Structured Streaming handle in-memory state when state data is growing?

In Spark Structured Streaming (version 2.2.0), in case of using mapGroupsWithState query with Update mode as the output mode, It seems that Spark is storing the in-memory state data using java.util.ConcurrentHashMap data structure. Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore? Also, is it possible to change the limit for storing the state data in the memory, using a spark config parameter?

Upvotes: 4

Views: 2548

Answers (2)

chandan prakash
chandan prakash

Reputation: 323

The existing State Store implementation uses in-memory HashMaps (for Storage) + HDFS (for fault tolerance) The HashMaps are versioned (one per micro-batch). There is one separate map of key-value for each version of every aggregated partition in the executor memory of the worker. (number of versions to maintain is configurable) To answer your question:

Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore.

The state store HashMaps shares the executor memory with shuffle tasks. So as state grows or shuffle tasks need more memory, frequent GCs and OOMs will happen leading executor failures.

is it possible to change the limit for storing the state data in the memory, using a spark config parameter?

Currently that is not possible. You can only specify executor memory which will be shared by both state store and executor tasks, there is no way we can divide memory between them. This actually makes the current implementation unreliable in case of sudden data outbursts, even watermarks will not be helpful in those cases.
In case interested to know how the state store works internally in structured streaming, this article might be useful: https://www.linkedin.com/pulse/state-management-spark-structured-streaming-chandan-prakash/

p.s. I am the author

Upvotes: 3

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore

The executor will crash with an OOM exception. Since with mapGroupWithState, you're the one in charge of adding and removing state, if you're overwhelming the JVM with data you can't allocate memory for, the process will crash.

is it possible to change the limit for storing the state data in the memory, using a spark config parameter?

It isn't possible to limit the number of bytes you're storing in memory. Again, if this is mapGroupsWithState, you have to manage state in such a way that won't cause your JVM to OOM, such as setting timeouts and removing state. If we're talking about stateful aggregations where Spark manages the state for you, such as the agg combinator, then you can limit the state using a watermark which will evict old data from memory once the time frame passes.

Upvotes: 2

Related Questions