selvinsource
selvinsource

Reputation: 1837

Spring Integration: how to process multiple messages at one time?

I have the following configuration:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
    <int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>

And the message handler defined as

@Override
public void handleMessage(Message<?> message) throws MessagingException {
    Object payload = message.getPayload();
    if (payload instanceof LogEntry) {
        LogEntry logEntry = (LogEntry) payload;
        String app = (String) message.getHeaders().get("app");
        logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
        logEntryPostProcessService.postProcess(app, logEntry);
    } else {
        throw new MessageRejectedException(message, "Unknown data type has been received.");
    }
}

What I would like to have is something like

@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}

so basically the poller sends all the 10 messages in one call instead of calling the method 10 times one per messages.

The reason for this is to have the possibility to bulk process all the messages in chunk therefore improving performance.

Upvotes: 9

Views: 6465

Answers (2)

selvinsource
selvinsource

Reputation: 1837

Thanks to @Artem Bilan, here the final solution:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
    send-partial-result-on-expiry="true"
    group-timeout="60000"
    correlation-strategy-expression="T(Thread).currentThread().id"
    release-strategy-expression="size() == 100">
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>

<int:channel id="logEntryAggrChannel"/>        

<!-- the payload is a list of log entries as result of the aggregator -->
<int:outbound-channel-adapter channel="logEntryAggrChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>

As per comment (in the code above), I had to set group-timeout/send-partial-result-on-expiry as some groups were formed with a Thread ID but never processed because they didn't reach the condition of size == 100.

Upvotes: 4

Artem Bilan
Artem Bilan

Reputation: 121560

That's true, because of (AbstractPollingEndpoint):

taskExecutor.execute(new Runnable() {
    @Override
    public void run() {
        int count = 0;
        while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
            if (!pollingTask.call()) {
                break;
            }
...
    }
});

Hence all your messages (max-messages-per-poll) are handled within the same thread. However they are sent to handler one by one and not as an entire bunch.

To process the in parallel you should use ExecutorChannel before your logEntryPostProcessorReceiver. Something like this:

<channel id="executorChannel">
   <dispatcher task-executor="threadPoolExecutor"/>
</channel>

<bridge input-channel="logEntryChannel" output-channel="executorChannel">
   <poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>

<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>

UPDATE

To process messages as one batch you should aggregate them. Since they all are result of polling endpoint, there is no sequenceDetails in the messages. You can overcome it with some fake value for correlationId:

<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
        release-strategy-expression="size() == 10"/>

Where size() == 10 should be equal to max-messages-per-poll.

After that your logEntryPostProcessorReceiver has to apply the list of payloads. Or just one message, which payload is a list as a result from <aggregator>.

Upvotes: 6

Related Questions