
Reputation: 1554

Spring Integration transaction strategy spanning file inbound adapter and queue channel

I have a directory that is being read by an inbound file adapter which is piped into a priority channel that sorts the files by their name. I've created a transaction synchronization factory for moving the files after processing is done which works fine for the inbound adapter and all the transformations/aggregations that are happening in an additional file writer flow. As soon as I add the PriorityChannel, the transaction seems to be finished and it's not being passed to the transformation/aggregration logic.

Here is the inbound flow

return IntegrationFlows
                        c -> c.poller(Pollers.fixedDelay(period)
                .bridge(s -> s.poller(Pollers.fixedDelay(100)))

And the transaction synchronization strategy

    TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
                "payload.renameTo(new " + " + T( +"));
                "payload.renameTo(new " + " + T( +"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);

Any idea how to span this transaction in combination with the priority queue channel? Or is there any other way that I could implement reading of files in an alphabetical order?


According to Gary, this should work (providing whole example as asked):

class FilePollingIntegrationFlow {

    public File inboundReadDirectory;

    private ApplicationContext applicationContext;

    public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                                  @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll, TaskExecutor taskExecutor,
                                                  MessageSource<File> fileReadingMessageSource) {
        return IntegrationFlows
                        c -> c.poller(Pollers.fixedDelay(period)

    TaskExecutor taskExecutor(@Value("${inbound.file.poller.thread.pool.size}") int poolSize) {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        return taskExecutor;

    PseudoTransactionManager transactionManager() {
        return new PseudoTransactionManager();

    TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
                "payload.renameTo(new " + " + T( +"));
                "payload.renameTo(new " + " + T( +"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);

    public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) {
        FileReadingMessageSource source = new FileReadingMessageSource();
        return source;

    public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) {
        DirectoryScanner scanner = new RecursiveDirectoryScanner();
        CompositeFileListFilter<File> filter = new CompositeFileListFilter<>(
                Arrays.asList(new AcceptOnceFileListFilter<>(), new RegexPatternFileListFilter(regex), new AlphabeticalFileListFilter()));
        return scanner;

    private class AlphabeticalFileListFilter implements FileListFilter<File> {
        public List<File> filterFiles(File[] files) {
            List<File> list = Arrays.asList(files);
            return list;

public class FilePollingConfiguration {

    public File inboundReadDirectory(@Value("${}") String path) {
        return makeDirectory(path);

    public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) {
        return makeDirectory(path);

    public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) {
        return makeDirectory(path);

    public File inboundOutDirectory(@Value("${inbound.out.path}") String path) {
        return makeDirectory(path);

    private File makeDirectory(String path) {
        File file = new File(path);
        return file;


By doing this and removing the PriorityChannel, it still seems that the transaction isn't working as I would thought. Using this flow, the file is not available in the Http outbound gateway. Any idea why?

public class MessageProcessingIntegrationFlow {

    public static final String OUTBOUND_FILENAME_GENERATOR = "outboundFilenameGenerator.handler";
    public static final String FILE_WRITING_MESSAGE_HANDLER = "fileWritingMessageHandler";
    public File inboundOutDirectory;

    public IntegrationFlow writeToFile(@Value("${api.base.uri}") URI uri,
                                       @Value("${out.filename.dateFormat}") String dateFormat, @Value("${out.filename.suffix}") String filenameSuffix) {
        return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
                               .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                                       .get(FileHeaders.FILENAME)).substring(0, 17)))
                               .aggregate(a -> a.groupTimeout(2000)
                               .transform(m -> {
                                   MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
                                   //noinspection unchecked
                                   ((List<File>) m).forEach(f -> body.add("documents", new FileSystemResource((File) f)));
                                   return body;
                                                    m -> m.getHeaders()
                                                          .get(FileHeaders.FILENAME) + "_" + DateTimeFormatter.ofPattern(dateFormat)
                                                                                                                      .now()) + filenameSuffix))

Upvotes: 1

Views: 844

Answers (2)


Reputation: 1554

Thanks to Gary Russel I came up with the following solution:

    public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                                  @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                                  @Value("${inbound.file.poller.thread.pool.size}") int poolSize,
                                                  MessageSource<File> fileReadingMessageSource) {
        return IntegrationFlows
                        c -> c.poller(Pollers.fixedDelay(period)
                .bridge(s -> s.poller(Pollers.fixedDelay(100)))

Advices with spec:

    public Advice fileMoveAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(new FunctionExpression<Message<?>>(m -> renameMultiValueMapFiles(m, this.inboundProcessedDirectory)));
        advice.setOnFailureExpression(new FunctionExpression<Message<?>>(m -> renameMultiValueMapFiles(m, this.inboundFailedDirectory)));
        return advice;

    public Consumer<GenericEndpointSpec<HttpRequestExecutingMessageHandler>> outboundSpec() {
        return new Consumer<GenericEndpointSpec<HttpRequestExecutingMessageHandler>>() {
            public void accept(GenericEndpointSpec<HttpRequestExecutingMessageHandler> spec) {
                spec.advice(fileMoveAdvice(), retryAdvice());

    private boolean renameMultiValueMapFiles(Message<?> m, File directory) {
        MultiValueMap<String, Resource> files = (MultiValueMap<String, Resource>) m.getPayload();
        List<File> list = new ArrayList<>();
        // no lambda to avoid ThrowsFunction type
        for (List<Resource> l : files.values()) {
            for (Resource v : l) {
        list.forEach(v -> v.renameTo(new File(directory.getPath(), v.getName())));
        return true;

Added spec to handle:

                            .expectedResponseType(byte[].class), this.advices.outboundSpec())

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174779

You cannot switch threads with Spring transactions; the transaction is bound to the thread.

You can use a custom FileListFilter in the message source instead and sort the files there.

Upvotes: 2

Related Questions