Reputation: 4418
I am implementing one Spring Integration work flow as below.
IntegrationFlows.from("inputFileProcessorChannel")
.split(fileSplitterSpec, spec -> {})
.transform(lineItemTransformer)
.handle(httpRequestExecutingMessageHandler)
.transform(reportDataAggregator)
.aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
.channel("reportGeneratorChannel")
.get();
Now, once the above flow is completed, I need to move the input file
to a archive directory. The decision to decide on the destination directory is based on a message header processingFailed
and this header is added in .transform(reportDataAggregator)
step in the flow. To move this files I have create another flow as in below code
IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
.routeToRecipients(routerSpec -> {
routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
.recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
})
.get();
Selector method
private MessageSelector createMessageSelector(Boolean ruleBoolean) {
return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}
Report Channel flow below
IntegrationFlows.from("reportGeneratorChannel")
.transform(executionReportTransformer)
.handle(reportWritingMessageHandlerSpec)
.get();
But, as expected with this flow, File movement is not done as the said header is not present into the flow execution.
So, How to achieve this goal to Execute the file mover flow
after the report file is created?
Upvotes: 2
Views: 337
Reputation: 121282
The FileSplitter
populates for us this headers for each line to produce:
@Override
protected boolean willAddHeaders(Message<?> message) {
Object payload = message.getPayload();
return payload instanceof File || payload instanceof String;
}
@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
File file = null;
if (message.getPayload() instanceof File) {
file = (File) message.getPayload();
}
else if (message.getPayload() instanceof String) {
file = new File((String) message.getPayload());
}
if (file != null) {
if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
headers.put(FileHeaders.ORIGINAL_FILE, file);
}
if (!headers.containsKey(FileHeaders.FILENAME)) {
headers.put(FileHeaders.FILENAME, file.getName());
}
}
}
So, even if you done with an aggregation and ready to send a message into that .channel("reportGeneratorChannel")
, you still have access to those file-related headers.
Making this reportGeneratorChannel
as a PublishSubscribeChannel
and move that "file mover flow" over there, would do the trick for you.
By the way: what you have so far with a IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
and a second flow on the same channel would lead you to round-robin dispatching. That's not pub-sub distribution. See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel
Upvotes: 3