Rayyan
Rayyan

Reputation: 173

Spring Integration ExecutorChannel running on Same Caller Thread

Need to process messages sequentially, irrespective of the spring.task.scheduling.pool.size threads defined. Hence, we defined a ExecutorChannel with single thread. However, we see the messages are processed parallelly by the caller's thread. Please suggest how to process the messages sequentially without blocking the caller thread.

@Bean
public MessageChannel svcErrorChannel() {
   return new ExecutorChannel(Executors.newSingleThreadExecutor());
}

return IntegrationFlows.from(svcErrorChannel())                                             
                       .log(ERROR, m -> "ErrorFlow Initiated: " + m.getPayload())
                

Application Logs:

2023-02-04 20:21:03,407 [boundedElastic-1          ] ERROR o.s.i.h.LoggingHandler - 1c710133ada428f0 ErrorFlow Initiated: org.springframework.messaging.MessageHandlingException: xxxxxxxxxxxxxxxx
2023-02-04 20:21:03,407 [boundedElastic-2          ] ERROR o.s.i.h.LoggingHandler - 1c710133ada428f0 ErrorFlow Initiated: org.springframework.messaging.MessageHandlingException: xxxxxxxxxxxxxxxxx

Upvotes: 0

Views: 129

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

The log() operator is essentially a ChannelInterceptor where that logging happens in a preSend() hook - the part of a MessageChannel on producer side. Therefore it is expected to see your Reactor threads in those logs.

If you really would like to log consumed (not produced) messages, then you need to use a .handle(new LoggingHandler()) instead of log() operator. Or you can use a .bridge() before your log().

See docs for more info: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-log

Upvotes: 1

Related Questions