Vijay Shanker Dubey
Vijay Shanker Dubey

Reputation: 4418

Conditional File movement in Spring Integration File Support

I am implementing one Spring Integration work flow as below.

IntegrationFlows.from("inputFileProcessorChannel")
                           .split(fileSplitterSpec, spec -> {})
                           .transform(lineItemTransformer)
                           .handle(httpRequestExecutingMessageHandler)
                           .transform(reportDataAggregator)
                           .aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
                           .channel("reportGeneratorChannel")
                           .get();

Now, once the above flow is completed, I need to move the input file to a archive directory. The decision to decide on the destination directory is based on a message header processingFailed and this header is added in .transform(reportDataAggregator) step in the flow. To move this files I have create another flow as in below code

 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
                           .routeToRecipients(routerSpec -> {
                               routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
                                         .recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
                           })

                           .get();  

Selector method

 private MessageSelector createMessageSelector(Boolean ruleBoolean) {
    return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}

Report Channel flow below

IntegrationFlows.from("reportGeneratorChannel")
                           .transform(executionReportTransformer)
                           .handle(reportWritingMessageHandlerSpec)
                           .get();

But, as expected with this flow, File movement is not done as the said header is not present into the flow execution.

So, How to achieve this goal to Execute the file mover flow after the report file is created?

Upvotes: 2

Views: 337

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121282

The FileSplitter populates for us this headers for each line to produce:

@Override
protected boolean willAddHeaders(Message<?> message) {
    Object payload = message.getPayload();
    return payload instanceof File || payload instanceof String;
}

@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
    File file = null;
    if (message.getPayload() instanceof File) {
        file = (File) message.getPayload();
    }
    else if (message.getPayload() instanceof String) {
        file = new File((String) message.getPayload());
    }
    if (file != null) {
        if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
            headers.put(FileHeaders.ORIGINAL_FILE, file);
        }
        if (!headers.containsKey(FileHeaders.FILENAME)) {
            headers.put(FileHeaders.FILENAME, file.getName());
        }
    }
}

So, even if you done with an aggregation and ready to send a message into that .channel("reportGeneratorChannel"), you still have access to those file-related headers.

Making this reportGeneratorChannel as a PublishSubscribeChannel and move that "file mover flow" over there, would do the trick for you.

By the way: what you have so far with a IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel")) and a second flow on the same channel would lead you to round-robin dispatching. That's not pub-sub distribution. See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel

Upvotes: 3

Related Questions