onkami
onkami

Reputation: 9411

Spring Integration Java DSL: buffering the message flow and put handler in separate thread

I have a flow configured in Spring Integration DSL:

// A custom channel Bean 
@Autowired
@Qualifier(INPUT_DATA_CHANNEL)
private PublishSubscribeChannel publishSubscribeChannel;

//A Service that can do database recording
@Autowired
private DatabaseActivator databaseActivator;

@Bean
public IntegrationFlow setupDatabaseFlow() {

    return IntegrationFlows.from(publishSubscribeChannel)
            .handle((p, h) -> databaseActivator.recordToDatabase(p))
            .get();
}

According to logs, everything happens sequentially in thread "main". BTW, I use publishSubscribeChannel as in parallel I have rabbit publisher/handler that listens in the same way to this channel.

Since Database operation takes time, how I should properly approach the handling so "main" is not slowed down. Preferably, main thread must be unblocked asap and handling should continue in a worker thread. Am I correct?

Can I introduce a buffer in the Flow, that will collect bursts of messages from the publishSubscribeChannel?

Also, I prefer other thread (pool) to handle actual send in order to remove load from the main thread which is executing the flow. I am well aware of ThreadPoolTaskExecutor in Spring that both has a buffer and a thread pool. Is it a good way to use it, and how employ ThreadPoolTaskExecutor in Java DSL way?

Upvotes: 0

Views: 457

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121282

There is the ctor on the matter:

/**
 * Create a PublishSubscribeChannel that will use an {@link Executor}
 * to invoke the handlers. If this is null, each invocation will occur in
 * the message sender's thread.
 *
 * @param executor The executor.
 */
public PublishSubscribeChannel(Executor executor) {

http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-configuration-pubsubchannel

With Java DSL you can declare it like:

@Bean
PublishSubscribeChannel publishSubscribeChannel(Executor executor) {
    return Channels.publishSubscribe(executor).get();
}

Upvotes: 0

Related Questions