user2953113
user2953113

Reputation: 569

PublishSubscribeChannel using TaskExecutor - Thread behaviour

I have a simple spring dsl flow as follows:

@Configuration
public class OrderFlow {

private static final Logger logger = LoggerFactory.getLogger(OrderFlow.class);

@Autowired
private OrderSubFlow orderSubFlow;

@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Bean
public IntegrationFlow orders() {

    return IntegrationFlows.from(MessageChannels.direct("order_input").get()).handle(new GenericHandler<Order>() {
        @Override
        public Object handle(Order order, Map<String, Object> headers) {
            logger.info("Pre-Processing order with id: {}", order.getId());
            return MessageBuilder.withPayload(order).copyHeaders(headers).build();
        }
    }).publishSubscribeChannel(threadPoolTaskExecutor, new Consumer<PublishSubscribeSpec>() {
        @Override
        public void accept(PublishSubscribeSpec t) {
            t.subscribe(orderSubFlow);
        }
    }).handle(new GenericHandler<Order>() {
        @Override
        public Object handle(Order order, Map<String, Object> headers) {
            logger.info("Post-Processing order with id: {}", order.getId());
            return MessageBuilder.withPayload(order).copyHeaders(headers).build();
        }
    }).get();

}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setMaxPoolSize(2);
    threadPoolTaskExecutor.setCorePoolSize(2);
    threadPoolTaskExecutor.setQueueCapacity(10);
    return threadPoolTaskExecutor;
}

}

And the OrderSubFlow is

@Configuration
public class OrderSubFlow implements IntegrationFlow {

private static final Logger logger = LoggerFactory.getLogger(OrderSubFlow.class);

@Override
public void configure(IntegrationFlowDefinition<?> flow) {
    flow.handle(new GenericHandler<Order>() {
        @Override
        public Object handle(Order order, Map<String, Object> headers) {
            logger.info("Processing order with id: {}", order.getId());
            return null;
        }
    });

}

}

When I put a message into the "order_input" channel, it's executing the first OrderFlow handler in the main thread and OrderSubFlow handler in TaskExecutor thread, which is expected. But the OrderFlow second handler is also getting executed in TaskExecutor thread. Is this an expected behaviour? Shouldn't OrderFlow second handler be executed in the main thread itself?

Please see the logs below.

INFO 9648 --- [           main] com.example.flows.OrderFlow              : Pre-Processing order with id: 10
INFO 9648 --- [lTaskExecutor-1] com.example.flows.OrderSubFlow           : Processing order with id: 10
INFO 9648 --- [lTaskExecutor-2] com.example.flows.OrderFlow              : Post-Processing order with id: 10

Here is the gateway I'm using

@MessagingGateway
public interface OrderService {

@Gateway(requestChannel="order_input")
Order processOrder(Order order);

}

Upvotes: 1

Views: 364

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

Please, read a discussion in the https://jira.spring.io/browse/INT-4264. That is really expected behavior. Just because that handler is one more subscriber to that publishSubscribeChannel.

To make what you want is possible with the .routeToRecipients() when one of the recipients is pub-sub with Executor, and another is DirectChannel to continue in the main thread.

Upvotes: 1

Related Questions