Ekaterina
Ekaterina

Reputation: 1892

Why are not all the messages polled from QueueChannel?

I created a QueueChannel with capacity=500 and send 1000 messages there. Not all of them are printed; the number of the last one is 567. Why is this the case?

Here is the code:

@SpringBootApplication
@IntegrationComponentScan
public class QueueChannelResearch {

    @Bean
    public IntegrationFlow lambdaFlow() {
        return f -> f.channel(c -> c.queue(500))

            .handle(System.out::println);
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication.run(QueueChannelResearch.class, args);

        MessageChannel inputChannel = ctx.getBean("lambdaFlow.input", MessageChannel.class);
        for (int i = 0; i < 1000; i++) {
            inputChannel.send(MessageBuilder.withPayload("w" + i)
                .build());
        }

        ctx.close();

    }

}

Here is the output:

GenericMessage [payload=w1, headers={id=d97946f2-1cf6-d681-fa88-08a4e708e61e, timestamp=1541524850590}]
...
GenericMessage [payload=w567, headers={id=83ab8720-f1c1-a4b1-b2ac-2a24a93bd00c, timestamp=1541524850590}]
GenericMessage [payload=w566, headers={id=d97946f2-1cf6-d681-fa88-08a4e708e61e, timestamp=1541524850590}]
GenericMessage [payload=w567, headers={id=83ab8720-f1c1-a4b1-b2ac-2a24a93bd00c, timestamp=1541524850590}]

Upvotes: 0

Views: 41

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

Since messages are polled from the queue in the separate scheduling thread, you need really wait until you get all of them.

Since you don't have any hook in your application to track messages I only can suggest to add a Thread.sleep(10000) before that ctx.close().

Or you can add a hook to wait for the input from user in the console:

    System.out.println("Hit 'Enter' to terminate");
    System.in.read();
    ctx.close();

Or just don't close the ctx and rely on the JVM termination.

Upvotes: 2

Related Questions