Reputation: 137
We have a scenario in our system where to kafka topic XYZ User details are published by some other producing application A (different system) and my application B is consuming from that topic.
The requirement is application B needs to consume that event 45 minutes after(or any configurable time) it is put in kafka topic XYZ by A (reason for this delay is that another REST api of some system C needs to trigger based on this User details event for particular user to confirm if it has some flag set for that user and that flag can be set at any point in that 45 minutes duration, although it could have been solved if C does not have the capability to publish to kafka or notify us in any way).
Our application B is written in spring.
The solution I tried was taking event from Kafka and checking the timestamp of the first event in the queue and if it is already 45 minutes for that event then process it or if it is less than 45 minutes then pause polling kafka container for that amount of time till it reaches 45 minutes using MessageListnerContainer pause() method. Something like below -
@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
public void delayedConsumer(@Payload String message,
Acknowledgment acknowledgment) {
UserDataEvent userDataEvent = null;
try {
userDataEvent = this.mapper.readValue(message, TopicRequest.class);
} catch (JsonProcessingException e) {
logger.error("error while parsing message");
}
MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
{
long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
// give negative ack to put already polled messages back to kafka topic
acknowledgment.nack(1000);
// pause container, and later resume it
delayedContainer.pause();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> {
delayedContainer.resume();
}, sleepTimeForPolling, TimeUnit.MILLISECONDS);
return;
}
// if message was already 45 minutes old then process it
this.service.processMessage(userDataEvent);
acknowledgment.acknowledge();
}
Though it works for single partition but i am not sure if this is a right approach, any comments on that? also i see multiple partitions it will cause problems, as above pause method call will pause the whole container and if one of the partition has old message it will not be consumed if container was paused because of new message in some other partition. Can i use this pause logic at partition level somehow?
Any better/recommended solution for achieving this delayed processing after a certain amount of configurable time which I can adopt in this scenario rather than doing what I did above?
Upvotes: 1
Views: 3288
Reputation: 174514
Kafka is not really designed for such scenarios.
One way I could see that technique working would be to set the container concurrency to the same as the number of partitions in the topic so that each partition is processed by a different consumer on a different thread; then pause/resume the individual Consumer<?, ?>
s instead of the whole container.
To do that, add the Consumer<?, ?>
as an additional parameter; to resume the consumer, set the idleEventInterval
and check the timer in an event listener (ListenerContainerIdleEvent
). The Consumer<?, ?>
is a property of the event so you can call resume()
there.
Upvotes: 2