Deepesh verma
Deepesh verma

Reputation: 669

Implement delayed message processing with spring-kafka

As kafka does not have implicit support for delayed message visibiliy, we have to implement it manually.

I am trying to implement it using the below approach -

  1. AdddelayInSeconds and eventTime filed in the message payload, while sending the message
public class DelayedMessage {
    
  String messageId;
  Instant eventTime;
  Long delayInSeconds;
}
  1. On recieving the message, calculate the exact instance when message should be processed
private Instant getInstantWhenToBeProcessed() {
  return eventTime.plus(delayInSeconds, ChronoUnit.SECONDS);
}
  1. Based on above value and now, calcuate the time for which cosumer has to wait
public long getTimeToBeDelayedInMillis() {
  return Duration.between(Instant.now(), getInstantWhenToBeProcessed()).toMillis();
}
  1. If the timeToBeDelayed is positive, then pause the contaienr, wait for the duration and resume the container - on the same thread
@Service
public class KafkaManager {
  
  @Autowired
  private KafkaListenerEndpointRegistry registry;
  
  public void pauseThenResumeAfter(long delayInMillis) throws InterruptedException {

    this.pause();

    log.info("Sleeping current thread by: {}ms", delayInMillis);
    Thread.sleep(delayInMillis);
    log.info("Waking up the thread");

    this.resume();

    // Throw exception, so that SeekToCurrentErrorHandle does a seek and tries to re-process
    throw new IllegalArgumentException("Failed to process record, as they instantToProcess has not yet arrived. Will retry after backoff");
  }

  public void pause() {
    registry.getListenerContainers().forEach(MessageListenerContainer::pause);
  }

  public void resume() {
    registry.getListenerContainers().forEach(MessageListenerContainer::resume);
  }
}
  1. After resume call, throw an exception, which will make SeekToCurrentErrorHandler retry the message processing. (If this is not done, the consumer is not getting to process the last message)

This all works fine and the container goes to sleep and resumes based on the above logic. BUT I have a few questions -

  1. Do we need to pasue and resume container on the same thread?
  2. Is there another way to seek to the current offset (not by using STCEH, as done here)? What would be the complexity involved?
  3. Will the above mentioned implementation will have any issue when using ConcurrentListenerContaienrs?
  4. Is there a way to calculate the backoff in the STCEH based on consumer record payload? If yes, then would implementing that keep this kafkaConsumer alive wrt the broker (as in the delay does not exceed max.poll.interval)?
  5. And at last - What are some possible drawbacks in the above implementation, which will make this a non-viable option.

Thanks !

Upvotes: 0

Views: 1886

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

If you call the pauseThenResumeAfter() from the listener thread, it will have no effect; the container is not actually paused until the listener method executes.

You should look at the new 2.7 feature Non-blocking retries.

You should be able to re-use some of its infrastructure to implement your requirements.

Upvotes: 1

Related Questions