Reputation: 133
Need some help figuring out the exception that i receive in one of the Kafka stream consumer.
I have implemented a Kafka streams with low level processor API. For each updates we receive from Kafka, it is merged and updated to keystore, so that the state is maintained. Initially we ran only one consumer and after some time we tried bringing up the 2nd consumer. But the 2nd consumer, during re-balancing threw a exception stating that it had failed to re-balance. This occurred because the state of a change log had changed (Exception share below). I assume, when the re-balance was occurring, some updates was received by the 1st consumer and hence the updates were pushed to the respective change log. Please help. Also sharing the sample code for the same. I am using Kafka 2_11 0.10.2.1 and the topic has 72 partitions
Exception
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
Code Snippet
public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}
@Override
public void process(Key key, Update update) {
try {
if (key != null && update != null) {
Info info = infoStore.get(key);
// merge logic
infoStore.put(key, info);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
try {
KeyValueIterator<Key, Info> iter = this.infoStore.all();
while (iter.hasNext()) {
// processing logic
}
iter.close();
context.commit();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
Thank you.
Upvotes: 1
Views: 3084
Reputation: 62285
Your observation an reasoning is correct. This can happen, if rebalance due to state migration takes long and another rebalance happens:
Can you verify this? If yes, you need to avoid a second rebalance as long as state recreation is running.
Btw: this behavior is improved in trunk
already and will be fixed in upcoming 0.11.0.1
release. You can update your Kafka Streams application to 0.11.0.1
without the need to upgrade the brokers. 0.11.0.1
should be release the next couple of weeks.
Upvotes: 3