Sundara Balaji J K
Sundara Balaji J K

Reputation: 163

Spring Integration hangs, Aggregator not polling from its Input Channel

We are using Queue channels & aggregator backed with JdbcMessageStore.

All working good for some time and with more request the message from aggregator's input channel are not polled by the aggregator. After app restart the aggregation is happening.

Please help out. Are we out of thread or what is happening?

here is the configuration structure,

<channel id="startChannel">
	<queue message-store="channelStore" />
</channel>

<bridge input-channel="startChannel"
	output-channel="routerChannel" />

<channel id="routerChannel">
	<queue message-store="channelStore" />
</channel>

<int:router input-channel="routerChannel"
	expression="payload.status">
	<int:mapping value="chunk completed" channel="loadData" />
	<int:mapping value="job completed" channel="aggregateData" />
	<int:mapping value="Failed" channel="errorChannel" />
</int:router>

<task:executor id="workerThreadExecutor" pool-size="8"
	queue-capacity="40" rejection-policy="DISCARD" />

<channel id="loadData">
	<dispatcher task-executor="workerThreadExecutor" />
</channel>

<service-activator id="dataServiceActivator"
	input-channel="loadData" method="loadUserDetails"
	output-channel="aggregateData">
	<beans:bean class="com.sample.DataServiceActivator" />
</service-activator>

<channel id="aggregateData">
	<queue message-store="channelStore" />
</channel>

<aggregator id="aggregator" input-channel="aggregateData"
	output-channel="completionChannel"
	release-strategy="releaseStrategyBean"
	release-strategy-method="canRelease"
	correlation-strategy-expression="headers.userId" ref="aggregatorBean"
	method="aggregateChunks" send-partial-result-on-expiry="true"

	message-store="persistentMessageStore"
	expire-groups-upon-completion="true" group-timeout="7200000"
	expire-groups-upon-timeout="true" lock-registry="lockRegistry" />

<!--
-- releaseStrategyBean - release when data count equals to total data
sent by 'Job Completed' message.
-- aggregatorBean - with aggregated
messages generates a report - batch job - could take some time to
complete.
 
 -->

<channel id="completionChannel">
	<queue message-store="channelStore" />
</channel>
 

Upvotes: 0

Views: 275

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

It's hard to say what's going on only by the config, but looks like your rejection-policy="DISCARD" in not good: you are going to lose data when a new task is rejected internal queue with size 40 is full already. Consider to use CALLER_RUNS instead.

In addition it isn't clear why would one always has only queue channel in between... For example why routerChannel is a queue as well, when the startChannel is a queue already... There is just no reason to handle everything through the queue channel, when some processes can simply happen on the same thread and calling stack.

I worry you about queue channels also because all of them are pollable channels and there is somewhere some poller you configure. And that one is based on the TaskScheduler with the pool of only 10 threads by default: https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/configuration.html#namespace-taskscheduler. So, this might bring you into the situation when all those scheduled threads are busy and there is no capacity to poll more messages from queue channels.

Upvotes: 1

Related Questions