Dth
Dth

Reputation: 1976

Processor.init() called multiple times for a single task in Kafka Stream

I use a processor that leverages a WALL_CLOCK_TIME punctuation, and I've noticed that after the rebalance phase the init() method gets called more than once for the same task.

I log this line in init():

log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);

And in the logs I can see that it was called twice:

07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7

Also, I log what's happening in the close() method, and I saw that a Cancellable was being cancelled there...

07:53:15 INFO - Closing cancellable org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

...and judging from its identity hashcode (11a53ebd), the existing processor had been reused, but a new one was created as well. As a result, my periodic task has been scheduled twice, instead of once.

I thought that there would be only one processor per task. Any ideas what might have caused such behavior and how can I prevent it from happening?

Upvotes: 3

Views: 1208

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

During rebalance, all Processor are closed and later initialized again after the rebalance. This is required to make sure no data is lost.

However, the hash you see refers to the registered punctuation, but not the Processor object. Thus, if you cancel the punctuation in close and schedule a punctuation in init() the old schedule will be replaced by a new schedule.

Upvotes: 2

Related Questions