Reputation: 53
I have a kafkaStreams Topology in which there is a Processor API. Inside the processor, there is a logic to call an external API.
Incase the API returns 503, the message tried will need to be retried.
Now, am trying to push this message to a different kafka topic & use the "Punctuate" method to pull batch of messages every minute from the failed topic, retry.
Is there a better way/approach to this problem ?.
Upvotes: 2
Views: 1654
Reputation: 503
A different yet robust approach would be to use a state store. They are backed by Kafka as compacted changelog topics.
You can store failed messages in the state store and process them all by calling schedule (punctuate) and then delete all the successfully processed ones.
For example:
public class MyProcessor {
private final long schedulerIntervalMs = 60000;
private final String entityStoreName = "failed-message-store";
private KeyValueStore<String, Object> entityStore;
@Override
public void init(ProcessorContext context) {
this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
timestamp -> processFailedMessagesStore());
}
@Override
public void process(String key, Object value) {
boolean apiCallSuccessful = // call API
if (!apiCallSuccesfull) {
entityStore.put(key, value);
}
}
private void processFailedMessagesStore() {
try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
allItems.forEachRemaining(item -> {
boolean successfullyProcessed = // re-process
if (successfullyProcessed) {
entityStore.delete(item.key);
}
});
}
}
}
Upvotes: 3