Mark
Mark

Reputation: 5612

How do I handle Kafka KStream unexpected state?

I have a Kafka stream that is set up as follows:

  1. A message comes in on topic A with state "created"
  2. A remote REST endpoint is called to notify that the message was received
  3. The message is then transformed (state is set to "notified") and sent back out on topic A

I know how to set this up for the happy path (REST endpoint returns HTTP 2xx). My question is: if my endpoint returns 400, how do I configure my kstream to handle that and not emit a message?

Here's my KSteam config:

    final KStream<String, Message> stream = streamsBuilder.stream(topic,
            Consumed.with(Serdes.String(), messageJsonSerde));
    stream
            .filter((key, value) -> MessageType.CASE_MATCHED.equals(
                    value.getEventType()))
            .mapValues(notifierService::handleNotification) // calls REST endpoint
            .to(topic, Produced.with(Serdes.String(), messageJsonSerde));
    return stream;

Upvotes: 0

Views: 54

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

I guess the simplest way would be, to add another filter after the mapValue that drops messages that got not successfully processed.

You could change the first mapValue to add this information to the record such that the filter can access it. Additionally, you can do a second mapValue after the second filter to remove this additional success flag to write an unmodified message back to Kafka.

Of course, this won't reprocess any failed message but only filter out failed messages from the output.

Upvotes: 1

Related Questions