Abhijit Sarkar
Abhijit Sarkar

Reputation: 24637

Spring Integration: MessageSource doesn't honor errorChannel header

I've the following flow:

@Resource(name = S3_CLIENT_BEAN)
private MessageSource<InputStream> messageSource;

public IntegrationFlow fileStreamingFlow() {
    return IntegrationFlows.from(s3Properties.getFileStreamingInputChannel())
        .enrichHeaders(spec -> spec.header(ERROR_CHANNEL, S3_ERROR_CHANNEL, true))
        .handle(String.class, (fileName, h) -> {
                    if (messageSource instanceof S3StreamingMessageSource) {
                        S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;

                        ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
                        chainFileListFilter.addFilters(...);
                        s3StreamingMessageSource.setFilter(chainFileListFilter);

                        return s3StreamingMessageSource.receive();
                    }                    
                    return messageSource.receive();
                }, spec -> spec
                        .requiresReply(false) // in case all messages got filtered out
        )
        .channel(s3Properties.getFileStreamingOutputChannel())
        .get();
}

I found that if s3StreamingMessageSource.receive throws an exception, the error ends up in the error channel configured for the previous flow in the pipeline, not the S3_ERROR_CHANNEL that's configured for this flow. Not sure if it's related to this question.

Upvotes: 1

Views: 294

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

The s3StreamingMessageSource.receive() is called from the SourcePollingChannelAdapter:

protected Message<?> receiveMessage() {
    return this.source.receive();
}

This one called from the AbstractPollingEndpoint:

private boolean doPoll() {

        message = this.receiveMessage();
...

        this.handleMessage(message);
...
}

That handleMessage() does this:

this.messagingTemplate.send(getOutputChannel(), message);

So, that is definitely still far away from the mentioned .enrichHeaders(spec -> spec.header(ERROR_CHANNEL, S3_ERROR_CHANNEL, true)) downstream.

However you still can catch an exception in that S3_ERROR_CHANNEL. Pay attention to the second argument of the IntegrationFlows.from():

IntegrationFlows.from(s3Properties.getFileStreamingInputChannel(),
           e -> e.poller(Pollers.fixedDelay(...)
                       .errorChannel(S3_ERROR_CHANNEL)))

Or, according your current you have somewhere a global poller, so configure an errorChannel there.

Upvotes: 1

Related Questions