Reputation: 1837
I have the following configuration:
<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactoryDefault"/>
</bean>
<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
<int:queue message-store="mongoDbMessageStore"/>
</int:channel>
<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
<int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>
And the message handler defined as
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof LogEntry) {
LogEntry logEntry = (LogEntry) payload;
String app = (String) message.getHeaders().get("app");
logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
logEntryPostProcessService.postProcess(app, logEntry);
} else {
throw new MessageRejectedException(message, "Unknown data type has been received.");
}
}
What I would like to have is something like
@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}
so basically the poller sends all the 10 messages in one call instead of calling the method 10 times one per messages.
The reason for this is to have the possibility to bulk process all the messages in chunk therefore improving performance.
Upvotes: 9
Views: 6465
Reputation: 1837
Thanks to @Artem Bilan, here the final solution:
<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactoryDefault"/>
</bean>
<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
<int:queue message-store="mongoDbMessageStore"/>
</int:channel>
<!--
the poller will process 100 messages every minute
if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
send-partial-result-on-expiry="true"
group-timeout="60000"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 100">
<int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>
<int:channel id="logEntryAggrChannel"/>
<!-- the payload is a list of log entries as result of the aggregator -->
<int:outbound-channel-adapter channel="logEntryAggrChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>
As per comment (in the code above), I had to set group-timeout/send-partial-result-on-expiry as some groups were formed with a Thread ID but never processed because they didn't reach the condition of size == 100.
Upvotes: 4
Reputation: 121560
That's true, because of (AbstractPollingEndpoint
):
taskExecutor.execute(new Runnable() {
@Override
public void run() {
int count = 0;
while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
if (!pollingTask.call()) {
break;
}
...
}
});
Hence all your messages (max-messages-per-poll
) are handled within the same thread.
However they are sent to handler one by one and not as an entire bunch.
To process the in parallel you should use ExecutorChannel
before your logEntryPostProcessorReceiver
. Something like this:
<channel id="executorChannel">
<dispatcher task-executor="threadPoolExecutor"/>
</channel>
<bridge input-channel="logEntryChannel" output-channel="executorChannel">
<poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>
<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>
UPDATE
To process messages as one batch you should aggregate
them. Since they all are result of polling endpoint
, there is no sequenceDetails
in the messages. You can overcome it with some fake value for correlationId
:
<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 10"/>
Where size() == 10
should be equal to max-messages-per-poll
.
After that your logEntryPostProcessorReceiver
has to apply the list
of payload
s. Or just one message, which payload
is a list as a result from <aggregator>
.
Upvotes: 6