Reputation: 17461
Currently I am keeping track of active threads that in process due to not letting system shutdown until I do not have any procesing threads
For example
package com.example.demo.flow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.concurrent.Executors;
/**
* Created by on 03/01/2020.
*/
@Component
@Slf4j
public class TestFlow {
@Bean
public StandardIntegrationFlow errorChannelHandler() {
return IntegrationFlows.from("testChannel")
.handle(o -> {
log.info("Handling error....{}", o);
}).get();
}
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
.errorChannel("testChannel")))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
}
I have enabled multiple file for integration flow but in Error Handler the thread is different How can I know which thread did it come from?
Is there anyway I can find out as this is very critical
Upvotes: 1
Views: 172
Reputation: 121272
According your current configuration the testChannel
is a DrectChannel
, so whatever you send to it is going to be processed on a thread your send from.
Therefore the Thread.currentThread()
is enough for your to determine it.
For more general solution consider to have a MessagePublishingErrorHandler
as a bean with the ChannelUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME
to override a default one. This MessagePublishingErrorHandler
can be supplied with a custom ErrorMessageStrategy
. There, when you create an ErrorMessage
, you can add a custom header with the same Thread.currentThread()
info to carry onto that error channel processing even if it is done in a separate thread.
You also could just throw an exception with that info, too, instead!
Upvotes: 1