Reputation: 377
I have a SI flow with a splitter which creates blockMessages and an aggregator down the flow which rejoins them. In between there is some processing for every blockMessage which is created by the splitter which involves downloading & saving a file & then sending a remote notification. Unfortunately this flow fails for some blockMessages so I thought I create an fallback error flow which for the moment only saves the error into the database and should return control to the aggregator to finish the job at the end.
Is this a correct approach (to return to the main flow) and how to trick Aggregator to interpret messages routed through the errorFlow as normal messages (cause for the moment the problem is that after I return from the errorFlow, the messages from the error flow differ from the messages which follow the normal flow and the aggregator doesnt handle them properly and doesnt know to finish) ? Or can you recommend another approach for dealing with these failing messages?
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(inbound)
.split(notificationSplitter)
.channel(inChannel)
.get();
}
@Bean
@DependsOn("downloadFlowError") //download error
public IntegrationFlow downloadFlow() {
return IntegrationFlows
.from(inChannel)
.enrichHeaders(s -> s.headerExpressions(c -> c.put("block", "payload")))
.handle(saveToSmb)
.channel(notifyChannel)
.get();
}
@Bean
public IntegrationFlow notifyFlow() {
return IntegrationFlows
.from(notifyChannel)
.handle(blockHandler)
.headerFilter("contentType")
.enrichHeaders(Collections.singletonMap("contentType", APPLICATION_JSON_VALUE))
.handle(outboundHttpNotifier)
.channel(resultsChannel)
.get();
}
@Bean
public IntegrationFlow resultsFlow() {
return IntegrationFlows
.from(resultsChannel)
.aggregate(aggregator)
.get();
}
@Bean
public IntegrationFlow downloadFlowError() {
return IntegrationFlows
.from(errorChannel)
.handle(errorHandler)
.channel(resultsChannel)
.get();
}
The exception is thrown in saveToSmb component.
Aggregator is like this but now needs also a ReleaseStrategy:
@Component
public class BlockAggregator implements CorrelationStrategy, MessageGroupProcessor {
@Override
public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().get("correlationId");
}
@Override
@Transactional
public String processMessageGroup(MessageGroup group) {
Message message = group.getMessages().iterator().next();
BlockMessage m = Objects.requireNonNull(message.getHeaders().get(BLOCK_MESSAGE_KEY, BlockMessage.class));
...
log.info ("finished processing event");
return "OK";
}
PS added also ErrorHandler which is like this, but its return sends a different type of message to the aggregator because the regular flow is longer and does more processing, so I guess I should adapt errors also:
@Component
public class ErrorHandler implements GenericHandler<MessageHandlingException> {
@Override
public Object handle(MessageHandlingException payload, MessageHeaders headers) {
BlockMessage blockMessage = (BlockMessage)payload.getFailedMessage().getPayload();
....
return payload.getFailedMessage();
}
Upvotes: 1
Views: 31
Reputation: 121552
Your logic about sending an error handler result to an aggregator is correct. Only what you are missing that error channel receives an ErrorMessage
. Not clear what you do in your errorHandler
, but recommendation is to cast a payload of that ErrorMessage
into a MessagingException
and take its failedMessage
for further processing. An important part there is headers
where you got all the required information about correlation and/or replyChannel
. You definitely can build a new message from error handler as a compensation, but be sure to preserve headers of that failedMessage
.
More info is here: https://github.com/spring-projects/spring-integration/issues/3984
Upvotes: 1