user2095692
user2095692

Reputation: 33

Spring Cloud Stream (Kafka) parameterize specified error channel {destination}.{group}.errors

I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.

@ServiceActivator(
        // I do not want to hardcode destination and consumer group here
        inputChannel = "stream-test-topic.my-consumer-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // Getting exception objects
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("exception occurred", errorMessagePayload);

        // Get message body
        Message<?> originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("Message Body: {}", originalMessage.getPayload());
        } else {
            log.error("The message body is empty");
        }
    }

Upvotes: 1

Views: 453

Answers (1)

Gary Russell
Gary Russell

Reputation: 174809

You can't do that with @ServiceActivator; use the Java DSL instead:

@Value("${error.channel}")
String errors;

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(this.errors)
            .handle(msg -> {
                System.out.println(msg);
            })
            .get();
}

And set

error:
  channel: stream-test-topic.my-consumer-group.errors

Upvotes: 2

Related Questions