Reputation: 63
I have a queue with unbounded capacity. I can limit the capacity however, my question is in what order messages will be processed. If queue has say 100,000 message and new message are still coming. will output channel get the new message coming or will pick it from queue in FIFO order?
<channel id="DeliveryProcessChannel">
<queue/>
</channel>
<aggregator input-channel="DeliveryProcessChannel" output-
channel="EmailDispatchChannel"
discard-channel="EmailDispatchChannel"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
group-timeout="1000"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 1000">
<poller max-messages-per-poll="1000" fixed-rate="1000"/>
</aggregator>
Upvotes: 0
Views: 544
Reputation: 121550
The QueueChannel
is fully based on FIFO algorithm.
To be precise: an in-memory (default) one is fully based on the LinkedBlockingQueue
.
All the persistent implementations are based on a store FIFO semantics. For example JDBC one has this query:
@Override
public String getPollFromGroupQuery() {
return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " +
"where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " +
"order by CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY";
}
If capacity is full, this logic is performed:
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e, long timeout, TimeUnit unit)
which is called from the QueueChannel.send()
returning a boolean
to indicate if message has need offered for queue or not.
If you would like to change the order message in a queue channel processed, you can provide your own Queue
impl:
/**
* Create a channel with the specified queue.
*
* @param queue The queue.
*/
public QueueChannel(Queue<Message<?>> queue) {
There is also a PriorityChannel
in Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-prioritychannel
Upvotes: 1