Yassir S
Yassir S

Reputation: 1042

Spark Streaming: how does mapWithState function work in cluster?

I am using Spark Streaming v2.0.0 to retrieve logs from Kafka and to do some manipulation. I am using the function mapWithState in order to save and update some fields related to a device. I am wondering how this function works in cluster. Indeed, i am just using the standalone mode so far but I will try it later with a Yarn cluster.

However, let's say I have a cluster with several nodes, if a node updates the sate of a device, does he notify immediately all other nodes of this update ? If no, the mapWithState function in cluster needs to be set. And how can I do that ?

Upvotes: 2

Views: 1369

Answers (3)

rakesh
rakesh

Reputation: 2051

Checkpoint is supplied as a directory, so can be from local file system, NFS mounted, HDFS hosted or S3 hosted!!!

Now, consider YARN + HDFS combination. Any data written to checkpoint due to mapWithState will be distributed across different HDFS nodes based on the state's key and spark will attempt to schedule tasks dependent on it on the same node.

But if you consider, YARN + NFS (perhaps not logical at all). Each node should mount the NFS at the same mount point and each read/write request will be an NFS request. This will create a perfect bottleneck!!!

Lets assume, state to manage session of users. We might choose to keep few bites or many GBs of information per user. The key in the state should somehow uniquely identify the user and each time the mapWithState function is triggered, all information saved in the state for that user will be accessible.

Upvotes: -1

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

However, let's say I have a cluster with several nodes, if a node updates the state of a device, does he notify immediately all other nodes of this update ? If no, the mapWithState function in cluster needs to be set.

That's not how mapWithState works. mapWithState is a shuffle stage, that means it will cause data in your cluster to move around. How does that affect mapWithState? Each entry (key value pair) will be shuffled to a particular Executor. Upon subsequent arrivals of that same key to whichever Executor was processing it from the input stream at the given time, it will be shuffled to the node holding the in-memory map with the state of previous messages. This is done by default via the HashPartitioner which will hash the key and then send it to the proper Executor holding the state, that's why you need to choose the key carefully.

This means that the state for a particular key isn't spread throughout the cluster. It is assigned to a particular Executor inside the cluster, and the incoming data will keep coming back to the one each time based on the hash of the key.

Upvotes: 5

user7252138
user7252138

Reputation: 1

All stateful transformations shuffle data by key so all values for a specific key are processed on the same executor thread.

There is no need for additional synchronization and state for a particular key is always consistent.

Upvotes: 0

Related Questions