glockman
glockman

Reputation: 189

How to Handle/Intercept Exceptions Thrown in Spring Cloud Streams (Elmhurst/2.0)

I am currently having a problem handling exceptions thrown during message processing in Spring Cloud Streams (Elmhurst.RELEASE). When my application throws an exception in the main processing method:

@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String process(String message) {
        externalService();
        return message.toUpperCase();
    }

    private void externalService() {
        throw new RuntimeException("An external call failed");
    }
}

I seem utterly unable to catch this exception on any error channel. From reading the documentation I expected that I would be able to to receive an ErrorMessage on either the global "errorChannel" or on the specific "input.myGroup.errors" channel.

I have attempted to use Spring Integrations listeners:

@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
    return message;
}

As well as Spring Cloud Streams listeners:

@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
    return message;
}

with no success. I have also played with various combinations of configuration settings like "errorChannelEnabled: true", "max-retries: 1", etc. with zero effect on me being able to catch an error thrown by my Processor (Using a debug point on "return message" to check). I don't even receive any error messages in the topic when configured "bindings.error.destination: myErrorTopic" as suggested in the SCS documentation.

The only thing that seems to work at all is "enableDlq: true" at the binder configuration level. However this does not meet important needs of being able to determine the original exception, as what is posted to the DLQ only has a header with a full stacktrace. I don't want to parse a stacktrace to figure out the type or message of the original exception.

Is there a better approach I should be taking here? Or is there some silly mistake I'm making? My overall goal is to send messages on exceptions with the actual Exception type and Exception messages to a DLQ.

Of course I could put try/catch statements around the whole of every Processor/Source/Sink method and then manually route to a different bound channel, but that detracts greatly from the value proposition of the SCS framework in my mind.

I found this example of custom DLQ handling as part of this earlier StackOverflow question, which would seem to meet my needs. However this doesn't seem to work at all with SCS Elmhurst/2.0 and Spring Kafka binder, even after migrating configuration to work for the updated SCS.

EDIT I've added a Github repository reproducing my error, as copying the same core code as Gary's answer doesn't seem to work. I'm starting to wonder if it's a POM dependency, configuration, or Kafka binder issue. This repo uses the same core code as Gary's answer, as I think it's a bit simpler to see the issue and debug.

EDIT AFTER GARY'S ANSWER I've accepted Gary's answer as it solves my original problem (getting around current framework issue when there are binder environments in config). However the DLQ messages ended up being fairly unhelpful for my case. I ended up subscribing to "errorChannel" once I got that working per Gary's answer, and from there creating a custom error message that I sent to a normal bound SCS channel.

Upvotes: 1

Views: 3509

Answers (1)

Gary Russell
Gary Russell

Reputation: 174514

Why do you have return message;? Where are you expecting it to go?

This works fine for me...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So50506586Application {

    public static void main(String[] args) {
        SpringApplication.run(So50506586Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(byte[] in) {
        throw new RuntimeException("fail");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void errors(ErrorMessage em) {
        System.out.println(em);
    }

}

ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.So50506586Application#listen[1 args]; nested exception is java.lang.RuntimeException: fail, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=0, ...

...

Caused by: java.lang.RuntimeException: fail

...

EDIT

It's a bug when using the multi-binder support (via the environment stuff). The global error channel is not visible.

This works...

spring:
  cloud:
      stream:
        bindings:
          input:
            destination: input
            group: myGroup7
            binder: kafka
#          error:
#            destination: errors
        kafka.bindings.input.consumer.autoCommitOnError: true
        kafka:
          binder:
            brokers: localhost:9092

(you no longer need the zkNodes with Elmhurst).

Also, you shouldn't use the legacy error.destination stuff since the message sent to the error channel is now a rich ErrorMessage object with lots of information, use the dlq stuff to automatically publish to a topic.

We need to remove those legacy error destination properties.

This doesn't work either...

@ServiceActivator(inputChannel = "input.myGroup.errors")
public void errors(ErrorMessage em) {
    System.out.println(em);
}

...because the service activator is in a different application context to the binding.

For now, the only work-around is to not use the environment style of configuration. I don't have a solution for you if you need multiple binder support.

BTW, with Elmhurst (and kafka 0.11 or higher), the stack trace is no longer embedded in the payload of messages sent to the DLQ, it's sent as a kafka header.

Upvotes: 1

Related Questions