Peter
Peter

Reputation: 333

Error handling in Spring Cloud Kafka Streams

I'm using Spring Cloud Stream with Kafka Streams. Let's say I have a processor which is a Function which converts a KStream of Strings to a KStream of CityProgrammes. It invokes an API to find the City by name and an other transformation which finds any events near that city.

Now the problem is that any error happens during the transformation, the whole application stops. I want to send that one particular message to a DLQ and move along. I've been reading for days and everyone suggests to handle errors within the called services but that is a nonesense in my opinion, plus I still need to return a KStream: how do I do that within a catch?

I also looked at UncaughtExeptionHandler but it is not aware of the message and only able to restart the processing which won't skip this invalid message.

This might sound like an A-B problem so the question rephrased: how do I maintain the flow in a KStream when an exception occurs and send the invalid item to the DLQ?

Upvotes: 2

Views: 3189

Answers (1)

sobychacko
sobychacko

Reputation: 5924

When it comes to the application-level errors you have, it is up to the application itself how the error is handled. Kafka Streams and the Spring Cloud Stream binder mainly support deserialization and serialization errors at the framework level. Although that is the case, I think your scenario can be handled. If you are using Kafka Client prior to 2.8, here is an SO answer I gave before on something similar: https://stackoverflow.com/a/66749750/2070861

If you are using Kafka/Streams 2.8, here is an idea that you can use. However, the code below should only be used as a starting point. Adjust it according to your use case. Read more on how branching works in Kafka Streams 2.8. The branching API is significantly refactored in 2.8 from the prior versions.

public Function<KStream<?, String>, KStream<?, Foo>> convert() {
            Foo[] foo = new Foo[0];
            return input -> {
                final Map<String, ? extends KStream<?, String>> branches =
                        input.split(Named.as("foo-")).branch((key, value) -> {
                                    try {
                                        foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
                                        return true;
                                    }
                                    catch (Exception e) {
                                        Message<?> message = MessageBuilder.withPayload(value).build();
                                        streamBridge.send("to-my-dlt", message);
                                        return false;
                                    }

                                }, Branched.as("bar"))
                                .defaultBranch();

                final KStream<?, String> kStream = branches.get("foo-bar");
                return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
            };

        }


    }

The default branch is ignored in this code because that only contains the records that threw exceptions. Those were handled by the catch statement above in which we send the records to a DLT programmatically. Finally, we get the good records and map them to a new KStream and send it through the outbound.

Upvotes: 1

Related Questions