Reputation: 5612
I have a Kafka stream that is set up as follows:
I know how to set this up for the happy path (REST endpoint returns HTTP 2xx). My question is: if my endpoint returns 400, how do I configure my kstream to handle that and not emit a message?
Here's my KSteam config:
final KStream<String, Message> stream = streamsBuilder.stream(topic,
Consumed.with(Serdes.String(), messageJsonSerde));
stream
.filter((key, value) -> MessageType.CASE_MATCHED.equals(
value.getEventType()))
.mapValues(notifierService::handleNotification) // calls REST endpoint
.to(topic, Produced.with(Serdes.String(), messageJsonSerde));
return stream;
Upvotes: 0
Views: 54
Reputation: 62285
I guess the simplest way would be, to add another filter
after the mapValue
that drops messages that got not successfully processed.
You could change the first mapValue
to add this information to the record such that the filter
can access it. Additionally, you can do a second mapValue
after the second filter
to remove this additional success
flag to write an unmodified message back to Kafka.
Of course, this won't reprocess any failed message but only filter out failed messages from the output.
Upvotes: 1