Reputation: 669
As kafka does not have implicit support for delayed message visibiliy, we have to implement it manually.
I am trying to implement it using the below approach -
delayInSeconds
and eventTime
filed in the message payload, while sending the messagepublic class DelayedMessage {
String messageId;
Instant eventTime;
Long delayInSeconds;
}
private Instant getInstantWhenToBeProcessed() {
return eventTime.plus(delayInSeconds, ChronoUnit.SECONDS);
}
public long getTimeToBeDelayedInMillis() {
return Duration.between(Instant.now(), getInstantWhenToBeProcessed()).toMillis();
}
timeToBeDelayed
is positive, then pause the contaienr, wait for the duration and resume the container - on the same thread@Service
public class KafkaManager {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void pauseThenResumeAfter(long delayInMillis) throws InterruptedException {
this.pause();
log.info("Sleeping current thread by: {}ms", delayInMillis);
Thread.sleep(delayInMillis);
log.info("Waking up the thread");
this.resume();
// Throw exception, so that SeekToCurrentErrorHandle does a seek and tries to re-process
throw new IllegalArgumentException("Failed to process record, as they instantToProcess has not yet arrived. Will retry after backoff");
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
This all works fine and the container goes to sleep and resumes based on the above logic. BUT I have a few questions -
Thanks !
Upvotes: 0
Views: 1886
Reputation: 174799
If you call the pauseThenResumeAfter()
from the listener thread, it will have no effect; the container is not actually paused until the listener method executes.
You should look at the new 2.7 feature Non-blocking retries.
You should be able to re-use some of its infrastructure to implement your requirements.
Upvotes: 1