Reputation: 24637
If I have one queue and multiple subscribers, how do I code the subscribers to only remove the messages they are interested in? I can use a PublishSubscribeChannel to send the message to all subscribers, but it has no filtering feature, and I'm not clear if the messages are ever removed after delivery. Another option is to read all messages, and filter in the subscriber, but then I need to invent a Kafka-ish behavior for message indexing to prevent messages already seen being processed again.
Upvotes: 2
Views: 1097
Reputation: 121560
Well, indeed there is no such a persistent topic
abstraction in Spring Integration out-of-the-box. However, since you say you need an in-memory solution, how about to consider to start embedded ActiveMQ and use Jms.publishSubscribeChannel()
based on the Topic
destination? Right, there is still no selector
from the Spring Integration subscribers even for this type of the MessageChannel
, but you still can use .filter()
to discard messages you are not interested in.
The same you can reach with the Hazelcast ITopic
:
@Bean
public ITopic<Message<?>> siTopic() {
return hazelcastInstance().getTopic("siTopic");
}
@Bean
public IntegrationFlow subscriber1() {
return IntegrationFlows.from(
Flux.create(messageFluxSink ->
siTopic()
.addMessageListener(message ->
messageFluxSink.next(message.getMessageObject()))))
.filter("headers.myHeader == foo")
.get();
}
@Bean
public IntegrationFlow subscriber2() {
return IntegrationFlows.from(
Flux.create(messageFluxSink ->
siTopic()
.addMessageListener(message ->
messageFluxSink.next(message.getMessageObject()))))
.filter("headers.myHeader == bar")
.get();
}
Well, actually looking to your plain in-memory model, I even would say that simple QueueChannel
and bridge
to the PublishSubscribeChannel
with the mentioned filter in each subscriber should be fully enough for you:
@Bean
public PollableChannel queueChannel() {
return new QueueChannel();
}
@Bean
@BridgeFrom("queueChannel")
public MessageChannel publishSubscribeChannel() {
return new PublishSubscribeChannel();
}
@Bean
public IntegrationFlow subscriber1() {
return IntegrationFlows.from(publishSubscribeChannel())
.filter("headers.myHeader == foo")
.get();
}
@Bean
public IntegrationFlow subscriber2() {
return IntegrationFlows.from(publishSubscribeChannel())
.filter("headers.myHeader == bar")
.get();
}
UPDATE
And one more option to use instead of PublishSubscribeChannel
and filter
combination is like RecipientListRouter
: https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#router-implementations-recipientlistrouter
Upvotes: 1