Reputation: 109
I would like to understand some details on how state stores and topic partitions are assigned to Stream Processing applications and their tasks.
Let's say I have a 4-partition Topic (tA). I also have 4 instances (i0,i1,i2,i3) of the same application.id (myApp) running on 4 different machines and streaming records from tA. The streaming engine will allocate 1 partition to each application instance. For the sake of argument, let's say partition allocation is : p0->i0, p1->i1, p2->i2 and p3->i3 And also assume my streaming application instances all create their state stores SS0, SS1,SS2,SS3. So basically, SS0 will hold records (keys) corresponding to p0, SS1->p1 etc.
Now, if i0 and i1 go down, and if i2 and i3 get reassigned additional partitions p0 and p1 respectively. Will the corresponding state stores that had p0 and p1 keys also get reassigned with those partitions ?
In short, my question is : do partitions and the state stores get associated with each other so that during reassignment, they move together ? i.e. we will never have the case that the task that gets p0 gets ss1 ?
Upvotes: 1
Views: 1734
Reputation: 1418
A task reads from one specific partition (or a set of partitions of different topics) and a task also maintains a specific state store. Tasks are the components that are moved around during assignments in a rebalancing.
In your example, the Kafka Streams app will have 4 tasks, t0..t3. Task t0 will read from partition p0, t1 from p1, etc. Each task will maintain its own state store. That means, task t0 will maintain state store SS0, t1 will maintain SS1 and so on.
Let's assume instance i0 execute task t0, i1 executes t1 etc. When instances i0 and i1 go down, tasks i0 and i1 are redistributed to instances i2 and i3. Now, i2 will execute t0 as well as t2 and i3 will execute t1 as well as t3. Since the state stores are part of the task they will migrate with them. If an instance to which a task with a state is migrated does not hold up-to-date data of the state, the state store will be restored on that instance from the changelog of the state on the Kafka brokers. Note that a task can also maintain multiple state stores, for instance when the task contains multiple stateful operations.
Since a task is bound to its input partitions and its state stores, you will never run into the situation where a task reads from a different partition or maintains a different state store after a migration to different instance.
You can find more details about task and state stores under the following links:
Upvotes: 2