Mahmoud Hanafy
Mahmoud Hanafy

Reputation: 1897

Where is Spark structured streaming state of mapGroupsWithState stored?

I know that the state is persisted at the checkpoint location as the state store. but I don't know while it's still in memory, where it's stored?

I created a Streaming job that uses mapGroupsWithState, but I see that storage memory used by executors is 0.

Does this mean that the state is stored in the execution memory? I can't know the amount of memory consumed by the state. not sure how to know if I need to increase the executor memory or not!

Also, is it possible to avoid checkpointing of the state at all and keep the state always in memory?

Upvotes: 2

Views: 1331

Answers (2)

Sumeet Sharma
Sumeet Sharma

Reputation: 2583

If you enable checkpointing, the states are stored in State Store. By default its a HDFSBackedStateStore but that can be overriden too. A good read on this would be https://medium.com/@polarpersonal/state-storage-in-spark-structured-streaming-e5c8af7bf509

As the other answer already mentioned, if you dont enable checkpointing you will loose fault-tolerance and at-least once guarantees.

Upvotes: 0

Michael Heil
Michael Heil

Reputation: 18495

As mapGroupsWithState is an aggregation it will be stored where all aggregations are kept within the lifetime of a Spark application: In the Execution Memory (as you have already assumed).

Looking at the signature of the method

def mapGroupsWithState[S: Encoder, U: Encoder](
      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] 

you will notice that S is the type of the user-defined state. And this is where the state is managed.

As this will be sent to the executors it must be encodable to Spark SQL types. Therefore, you would typically use a case class in Scala or a Bean in Java. The GroupState is a typed wrapper object that provides methods to access and manage the state value.

It is crucial that you, as developer, also take care of how data gets removed from this state. Otherwise, your state will inevitably cause an OOM as it will only grow and never shrink.

If you do not enable checkpointing in your structured stream then nothing is stored but you have the drawback to loose your state during a failure. In case you have enabled checkpointing, e.g. to keep track of the input source, Spark will also store the current state into the checkpoint location.

Upvotes: 3

Related Questions