Reputation: 33
I'm trying to create a application-level error handler for failures during the processing inside SCS application using Kafka as a message broker. I know that SCS already provides the DLQ functionality, but in my case I want to wrap failed messages with a custom wrapper type (providing the failure context (source, cause etc.))
In https://github.com/qabbasi/Spring-Cloud-Stream-DLQ-Error-Handling you can see two approaches for this scenario: one is using SCS and the other one directly Spring Integration. (both are atm not working)
According to the current reference (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producing_and_consuming_messages) SCS will allow to publish error messages received from the Spring Integration error channel, but unfortunately this not the case, at least for me. Although the application logs the following upon startup
o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 2 subscriber(s).
Upvotes: 0
Views: 2580
Reputation: 174514
You shouldn't use @StreamListener("errorChannel")
- that is consuming from a binder destination; to capture messages sent to the errorChannel
use @ServiceActivator(inputChannel = "errorChannel")
.
EDIT
There were several problems with your app...
autoCommitOnError
is a kafka binder property@EnableBinding(CustomDlqMessageChannel.class)
@EnableIntegration
- boot does that for youSee my commit here.
and...
$ kafka-console-producer --broker-list localhost:9092 --topic testIn
>foo
and...
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic customDlqTopic --from-beginning
?
contentType>"application/x-java-object;type=com.example.demo.ErrorWrapper"
Upvotes: 2