Reputation: 1976
I use a processor that leverages a WALL_CLOCK_TIME
punctuation, and I've noticed that after the rebalance phase the init()
method gets called more than once for the same task.
I log this line in init()
:
log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);
And in the logs I can see that it was called twice:
07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7
Also, I log what's happening in the close()
method, and I saw that a Cancellable
was being cancelled there...
07:53:15 INFO - Closing cancellable org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
...and judging from its identity hashcode (11a53ebd), the existing processor had been reused, but a new one was created as well. As a result, my periodic task has been scheduled twice, instead of once.
I thought that there would be only one processor per task. Any ideas what might have caused such behavior and how can I prevent it from happening?
Upvotes: 3
Views: 1208
Reputation: 62285
During rebalance, all Processor
are closed and later initialized again after the rebalance. This is required to make sure no data is lost.
However, the hash you see refers to the registered punctuation, but not the Processor
object. Thus, if you cancel
the punctuation in close
and schedule
a punctuation in init()
the old schedule will be replaced by a new schedule.
Upvotes: 2