Reputation: 174
Currently, I am facing a problem where I need to consume data from different topics according to a given specified order. Let say we have 3 topics called topic-1, topic-2, and topic-3. first I need to make sure that I consume topics in the following order.
topic-2 > topic-3 > topic-1
The application should listen and read all the messages in topic-2 and then it should proceed to consume from topic-3 and then topic-1. Again needs to do this until topics receive messages.
Is this possible with Kafka?
Upvotes: 0
Views: 81
Reputation: 24527
I'm not sure if you have any special constraints, but you can try do this in you application code:
consumeAll(topic2); // when done, consume next topic
consumeAll(topic3); // when done, consume next topic
consumeAll(topic1);
But beware: If new messages get appended to multiple topics at the "same" time, you won't be able to recreate the insertion order in your application code, because Kafka only guarantees order within a single topic partition, not across multiple partitions or multiple topics.
You could work with timestamps, which must be embedded in the Kafka messages. So you can tell which one came first:
{ messageId, payload, timestamp }
Using timestamps means, that all your producers must use a synchronized clock. Else you could get some milliseconds drift and the correct ordering is gone.
But then you run into the next problem: How long do you want to wait before you start processing? (if for example topic3 doesn't receive new messages)
Another thing to consider: You receive a new message from topic3. What should happen now? According to your description you can't process it, because there must be some message from topic2 first. How long do you want to wait for a new message from topic2?
Though maybe it's better to only listen for topic2. And only when you receive a message from topic2, you start to fetch messages from topic3. And when you got something from topic3, you start to listen to topic1. And then start all over again.
Something like this:
while(true) {
msg2 = consumeAllNewMessages(topic2); // blocking call, until message received
msg3 = consumeAllNewMessages(topic3); // blocking, too
msg1 = consumeAllNewMessages(topic1); // blocking, too
process(msg2, msg3, msg1);
}
(Of course you could (and should) replace the blocking calls with some non-blocking code, for example using CompletableFuture)
But again: This will only guarantee the order in which you consume these topics. But it doesn't tell you in which order these messages (across the topics) were sent to Kafka. This needs embedded timestamps with synchronized clocks.
Upvotes: 1