Reputation: 149
In Spark Structured Streaming (version 2.2.0), in case of using mapGroupsWithState
query with Update mode as the output mode, It seems that Spark is storing the in-memory state data using java.util.ConcurrentHashMap
data structure. Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore? Also, is it possible to change the limit for storing the state data in the memory, using a spark config parameter?
Upvotes: 4
Views: 2548
Reputation: 323
The existing State Store implementation uses in-memory HashMaps (for Storage) + HDFS (for fault tolerance) The HashMaps are versioned (one per micro-batch). There is one separate map of key-value for each version of every aggregated partition in the executor memory of the worker. (number of versions to maintain is configurable) To answer your question:
Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore.
The state store HashMaps shares the executor memory with shuffle tasks. So as state grows or shuffle tasks need more memory, frequent GCs and OOMs will happen leading executor failures.
is it possible to change the limit for storing the state data in the memory, using a spark config parameter?
Currently that is not possible. You can only specify executor memory which will be shared by both state store and executor tasks, there is no way we can divide memory between them. This actually makes the current implementation unreliable in case of sudden data outbursts, even watermarks will not be helpful in those cases.
In case interested to know how the state store works internally in structured streaming, this article might be useful: https://www.linkedin.com/pulse/state-management-spark-structured-streaming-chandan-prakash/
p.s. I am the author
Upvotes: 3
Reputation: 149518
Can someone explain to me in detail that what happens when the state data grows and there isn't enough memory anymore
The executor will crash with an OOM exception. Since with mapGroupWithState
, you're the one in charge of adding and removing state, if you're overwhelming the JVM with data you can't allocate memory for, the process will crash.
is it possible to change the limit for storing the state data in the memory, using a spark config parameter?
It isn't possible to limit the number of bytes you're storing in memory. Again, if this is mapGroupsWithState
, you have to manage state in such a way that won't cause your JVM to OOM, such as setting timeouts and removing state. If we're talking about stateful aggregations where Spark manages the state for you, such as the agg
combinator, then you can limit the state using a watermark which will evict old data from memory once the time frame passes.
Upvotes: 2