Abhijit Sarkar
Abhijit Sarkar

Reputation: 24637

Spring Integration: how to read selected messages from a queue

If I have one queue and multiple subscribers, how do I code the subscribers to only remove the messages they are interested in? I can use a PublishSubscribeChannel to send the message to all subscribers, but it has no filtering feature, and I'm not clear if the messages are ever removed after delivery. Another option is to read all messages, and filter in the subscriber, but then I need to invent a Kafka-ish behavior for message indexing to prevent messages already seen being processed again.

Upvotes: 2

Views: 1097

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

Well, indeed there is no such a persistent topic abstraction in Spring Integration out-of-the-box. However, since you say you need an in-memory solution, how about to consider to start embedded ActiveMQ and use Jms.publishSubscribeChannel() based on the Topic destination? Right, there is still no selector from the Spring Integration subscribers even for this type of the MessageChannel, but you still can use .filter() to discard messages you are not interested in.

The same you can reach with the Hazelcast ITopic:

    @Bean
    public ITopic<Message<?>> siTopic() {
        return hazelcastInstance().getTopic("siTopic");
    }

    @Bean
    public IntegrationFlow subscriber1() {
        return IntegrationFlows.from(
                Flux.create(messageFluxSink ->
                        siTopic()
                                .addMessageListener(message ->
                                        messageFluxSink.next(message.getMessageObject()))))
                .filter("headers.myHeader == foo")
                .get();
    }

    @Bean
    public IntegrationFlow subscriber2() {
        return IntegrationFlows.from(
                Flux.create(messageFluxSink ->
                        siTopic()
                                .addMessageListener(message ->
                                        messageFluxSink.next(message.getMessageObject()))))
                .filter("headers.myHeader == bar")
                .get();
    }

Well, actually looking to your plain in-memory model, I even would say that simple QueueChannel and bridge to the PublishSubscribeChannel with the mentioned filter in each subscriber should be fully enough for you:

    @Bean
    public PollableChannel queueChannel() {
        return new QueueChannel();
    }

    @Bean
    @BridgeFrom("queueChannel")
    public MessageChannel publishSubscribeChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public IntegrationFlow subscriber1() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .filter("headers.myHeader == foo")
                .get();
    }

    @Bean
    public IntegrationFlow subscriber2() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .filter("headers.myHeader == bar")
                .get();
    }

UPDATE

And one more option to use instead of PublishSubscribeChannel and filter combination is like RecipientListRouter: https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#router-implementations-recipientlistrouter

Upvotes: 1

Related Questions