İlkay Gunel
İlkay Gunel

Reputation: 852

Spring Integration Best Practice To Receive Message From Queue Channel

I'm struggling to process messages which are in a queue channel. The service activator is not triggering immediately. I need to trigger asap when a message comes to demoInputChannel.

I have channel which is queue channel.

<int:channel id="demoInputChannel">
    <int:queue/>
</int:channel>

I have a service activator to process messagess which are in demoInputChannel. It's definition like this:

<int:service-activator input-channel="demoInputChannel"
                       output-channel="demoOutputChannel"
                       ref="demoService"
                       method="demoMethod"
                       requires-reply="true">

I also have a poller which defined globally in project:

<int:poller default="true" fixed-delay="0" max-messages-per-poll="1"/>

How can trigger the service activator immediately?

EDIT: I changed demoInputChannel to like this to be sure it's using special poller.

@Bean
QueueChannel demoInputChannel() {
    return new QueueChannel(10);
}

@ServiceActivator(inputChannel = "demoInputChannel", outputChannel = "demoOutputChannel")
public Message<List<DataElement>> handle(Message<Long> in) throws Exception {
    System.out.println("item.getPayload " + in.getPayload());
    return autowiredObject.autowiredObjectMethod(in);
}

EDIT 2:

Here is my splitter method:

@Splitter(inputChannel = "splitterInputChannel", outputChannel = "demoInputChannel")
public List<Message<Long>> splitterHandle(Message<?> message) {
    List<Message<Long>> myList = new ArrayList<>();
    List<Long> idList = (List<Long>) message.getPayload();
    for (Long l : idList) {
        myList.add(org.springframework.integration.support.MessageBuilder.withPayload(l).build());
    }
    return myList;
}

Upvotes: 0

Views: 2409

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121550

Don't do max-messages-per-poll="1". Use -1 to always process messages from the queue immediately when they are there. If there are no messages, the polling task is going to sleep for that fixed-delay timeout. But since everything are distributed between different thread, there is indeed going to be some delay.

See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-adapter-namespace-inbound. The mportant: Poller Configuration paragraph.

UPDATE

Here is a unit test to demonstrate that everything works as expected:

@SpringJUnitConfig
public class So69573293Tests {

    @Autowired
    MessageChannel inputChannel;

    @Test
    void testConsumptionFromQueue() throws InterruptedException {
        String payload = IntStream.range(0, 10).mapToObj(Integer::toString).collect(Collectors.joining(","));
        this.inputChannel.send(new GenericMessage<>(payload));

        Thread.sleep(1000);
    }

    @Configuration
    @EnableIntegration
    public static class Config {

        @Splitter(inputChannel = "inputChannel", outputChannel = "demoInputChannel")
        List<Long> split(String payload) {
            return Arrays.stream(payload.split(","))
                    .map(Long::parseLong)
                    .collect(Collectors.toList());
        }


        @Bean
        QueueChannel demoInputChannel() {
            return new QueueChannel(10);
        }

        @ServiceActivator(inputChannel = "demoInputChannel", poller = @Poller(maxMessagesPerPoll = "-1", fixedRate = "1000"))
        public void handle(List<Long> in) throws InterruptedException {
            System.out.println("payload " + in);
            in.forEach(item -> System.out.println("item " + item));
        }

    }

}

The output is like this:

payload [0]
item 0
payload [1]
item 1
payload [2]
item 2
payload [3]
item 3
payload [4]
item 4
payload [5]
item 5
payload [6]
item 6
payload [7]
item 7
payload [8]
item 8
payload [9]
item 9

UPDATE 2

@SpringJUnitConfig
public class So69573293Tests {

    @Autowired
    MessageChannel inputChannel;

    @Test
    void testConsumptionFromQueue() throws InterruptedException {
        List<Long> payload = LongStream.range(0, 10).boxed().collect(Collectors.toList());
        this.inputChannel.send(new GenericMessage<>(payload));

        Thread.sleep(1000);
    }

    @Configuration
    @EnableIntegration
    public static class Config {

        @Splitter(inputChannel = "inputChannel", outputChannel = "demoInputChannel")
        public List<Message<Long>> splitterHandle(Message<List<Long>> message) {
            List<Message<Long>> myList = new ArrayList<>();
            List<Long> idList = message.getPayload();
            for (Long l : idList) {
                myList.add(MessageBuilder.withPayload(l).build());
            }
            return myList;
        }


        @Bean
        QueueChannel demoInputChannel() {
            return new QueueChannel(10);
        }

        @ServiceActivator(inputChannel = "demoInputChannel", poller = @Poller(maxMessagesPerPoll = "-1", fixedRate = "1000"))
        public void handle(Message<Long> in) throws InterruptedException {
            System.out.println("payload " + in);
        }

    }

}

Output:

payload GenericMessage [payload=0, headers={sequenceNumber=1, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=d4715f90-c320-5555-78c2-cba815cdb2d2, sequenceSize=10, timestamp=1634243521339}]
payload GenericMessage [payload=1, headers={sequenceNumber=2, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=31e7f76f-b9e9-f53a-e5d4-82d620d6657f, sequenceSize=10, timestamp=1634243521339}]
payload GenericMessage [payload=2, headers={sequenceNumber=3, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=9083ae5d-60d6-7800-9a58-75eb3de17e56, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=3, headers={sequenceNumber=4, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=ec797609-9187-08ae-977b-7e11d7e451fc, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=4, headers={sequenceNumber=5, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=3d301b78-e6c1-9067-a644-7c951d0e484c, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=5, headers={sequenceNumber=6, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=1ec9223f-9060-0c8f-9b6f-1a3ef5a1d495, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=6, headers={sequenceNumber=7, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=b0231d74-6721-3f41-dc49-b8ce3179229e, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=7, headers={sequenceNumber=8, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=bd7508d9-429e-37f0-2353-3148008ca0a2, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=8, headers={sequenceNumber=9, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=1fe4625e-8807-e7c7-82a6-8867a6400790, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=9, headers={sequenceNumber=10, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=63bd12fb-0774-e187-a76f-f3936f71d7d4, sequenceSize=10, timestamp=1634243521340}]

Upvotes: 1

Related Questions