Reputation: 23
I would like to know how Spring Kafka handles retries given multiple partitions assigned to an instance. Does Spring Kafka keep retrying the same message according to the retry policy and backoff policy or does it retry, and in-between retrying, does it send messages from other partitions?
Is the behavior:
A) retry message -> retry message -> retry message
B) retry message -> other message -> retry message -> retry message
I've looked at other stackoverflow questions that seem to confirm that given a single partition Spring Kafka will not move to another offset, but there was no info on what is behavior if there were multiple partitions assigned to the instance. I've implemented a factory that has a retry template and stateful retry.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
ListenerExceptions listenerExceptions = new ListenerExceptions();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaProperties.CONCURRENCY);
factory.getContainerProperties().setPollTimeout(KafkaProperties.POLL_TIMEOUT_VLAUE);
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStatefulRetry(true);
factory.setRecoveryCallback((RetryContext context) -> listenerExceptions.recover(context));
return factory;
}
Upvotes: 0
Views: 693
Reputation: 121542
The retry configuration from the mentioned factory is delegated into the RetryingMessageListenerAdapter
, which logic is like this:
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
final Consumer<?, ?> consumer) {
RetryState retryState = null;
if (this.stateful) {
retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
}
getRetryTemplate().execute(context -> {
context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
break;
case CONSUMER_AWARE:
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
break;
case SIMPLE:
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}
return null;
},
getRecoveryCallback(), retryState);
}
So, we do retry per message. According Apache Kafka recommendations we process one partition in one thread, so every next record in that partition won't be handled until retry is exhausted or call has been successful.
According to your multiple partitions condition and factory.setConcurrency(KafkaProperties.CONCURRENCY);
configuration, it might be the fact that different partitions are processed in different threads. Therefore it might be the case that different records from different partitions are retried at the same time. Just because a retry is tied to the thread and call stack.
Upvotes: 1