Reputation: 161
I am trying to use an ActiveMQ broker to deliver a message to two consumers listening on an automatic topic, employing Spring Integration facilities.
Here are my configuration beans (in common between publishers and subscribers):
@Value("${spring.activemq.broker-url}")
String brokerUrl;
@Value("${spring.activemq.user}")
String userName;
@Value("${spring.activemq.password}")
String password;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true); //!!
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true); //!!
return template;
}
Here are beans for consumers:
@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
.channel("jmsInputChannel").get();
}
//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg){
System.out.println("Received Message: " + msg);
}
And these are the beans for the producer:
@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
.destination("myTopic")
).get();
}
private static int counter = 1;
@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() {
String s = "Message number " + counter;
counter++;
jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
}
I am NOT using an embedded ActiveMQ broker. I am using one broker, one producer and two consumers each in their own (docker) container.
My problem is that, while I have invoked setPubSubDomain(true)
on both the JmsListenerContainerFactory
and the JmsTemplate
, my "topics" behave as queues: one consumer prints all the even-numbered messages, while the other prints all the odd-numbered ones.
In fact, by accessing the ActiveMQ web interface, I see that my "topics" (i.e. under the /topics.jsp page) are named ActiveMQ.Advisory.Consumer.Queue.myTopic
and ActiveMQ.Advisory.Producer.Queue.myTopic
, and "myTopic" does appear in the queues page (i.e. /queues.jsp).
The nodes get started in the following order:
The first "topic" that gets created is ActiveMQ.Advisory.Consumer.Queue.myTopic
, while the producer one appears only after the producer has started, obviously.
I am not an expert on ActiveMQ, so maybe the fact of my producer/consumer "topics" being named ".Queue" is just misleading. However, I do get the semantics described in the official ActiveMQ documentation for queues, rather than topics.
I have also looked at this question already, however all of my employed channels are already of the PublishSubscribeChannel kind.
What I need to achieve is having all messages delivered to all of my (possibly > 2) consumers.
UPDATE: I forgot to mention, my application.properties
file already does contain spring.jms.pub-sub-domain=true
, along with other settings.
Also, the version of Spring Integration that I am using is 4.3.12.RELEASE.
The problem is, I still get a RR-load-balanced semantics rather than a publish-subscribe semantics.
As for what I can see in the link provided by @Hassen Bennour, I would expect to get a ActiveMQ.Advisory.Producer.Topic.myTopic
and a ActiveMQ.Advisory.Consumer.Topic.myTopic
row on the list of all topics. Somehow I think I am not using well the Spring Integration libraries, and thus I am setting up a Queue when I want to set up a Topic.
UPDATE 2: Sorry about the confusion. jmsOutputChannel2
is in fact jmsOutputChannel
here, I have edited the main part. I am using a secondary "topic" in my code as a check, something for the producer to send message to and receive replies itself. The "topic" name differs as well, so... it's on a separate flow entirely.
I did achieve a little progress by changing the receiver flows in this way:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
//return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
//.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
.channel("jmsInputChannel").get();
}
This produces an advisory topic of type Consumer.Topic.myTopic
rather than Consumer.Queue.myTopic
on the broker, AND indeed a topic named just myTopic
(as I can see from the topics tab). However, once the producer starts, a Producer.Queue
advisory topic gets created, and messages get sent there while not being delivered.
The choice of adapter in the input flow seems to determine what kind of advisory consumer topic gets created (Topic vs Queue when switching to Jms.publishSubscribeChannel()
from Jms.messageDrivenChannelAdapter()
). However, I haven't been able to find something akin for the output flow.
UPDATE 3: Problem solved, thanks to @Hassen Bennour. Recap:
I wired the jmsTemplate()
in the producer's Jms.outboundAdapter()
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
.destination("myTopic")
).get();
}
And a more complex configuration for the consumer Jms.messageDrivenChannelAdapter()
:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
Though this is probably the smoothest and most flexible method, having such a bean...
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}
to wire as a destination for the adapters, rather than just a String.
Thanks again.
Upvotes: 3
Views: 3902
Reputation: 3913
add spring.jms.pub-sub-domain=true to application.properties
or
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// the configurer will use PubSubDomain from application.properties if defined or false if not
//so setting it on the factory level need to be set after this
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
ActiveMQ.Advisory.Consumer.Queue.myTopic
is an Advisory topic for a Queue named myTopic
take a look here to read about Advisory
http://activemq.apache.org/advisory-message.html
UPDATE :
update your definitions like below
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
.destination("myTopic")
).get();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
or define the Destination as a topic and replace destination("myTopic") by destination(topic())
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}
Upvotes: 4