Makky
Makky

Reputation: 17461

Spring Integration DSL Error Handler Thread ID

Currently I am keeping track of active threads that in process due to not letting system shutdown until I do not have any procesing threads

For example

package com.example.demo.flow;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.Executors;

/**
 * Created by on 03/01/2020.
 */
@Component
@Slf4j
public class TestFlow {

    @Bean
    public StandardIntegrationFlow errorChannelHandler() {

        return IntegrationFlows.from("testChannel")
                .handle(o -> {

                    log.info("Handling error....{}", o);
                }).get();
    }

    @Bean
    public IntegrationFlow testFile() {


        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
                        .errorChannel("testChannel")))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .transform(o -> {

                    throw new RuntimeException("Failing on purpose");

                }).handle(o -> {
                });

        return testChannel.get();


    }


}

I have enabled multiple file for integration flow but in Error Handler the thread is different How can I know which thread did it come from?

Is there anyway I can find out as this is very critical

Upvotes: 1

Views: 172

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

According your current configuration the testChannel is a DrectChannel, so whatever you send to it is going to be processed on a thread your send from. Therefore the Thread.currentThread() is enough for your to determine it.

For more general solution consider to have a MessagePublishingErrorHandler as a bean with the ChannelUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME to override a default one. This MessagePublishingErrorHandler can be supplied with a custom ErrorMessageStrategy. There, when you create an ErrorMessage, you can add a custom header with the same Thread.currentThread() info to carry onto that error channel processing even if it is done in a separate thread.

You also could just throw an exception with that info, too, instead!

Upvotes: 1

Related Questions