Reputation: 1566
I'm trying to start ActiveMQ from java enabling virtual destinations but I think I have the configuration wrong.
public static void main(String[] args) throws Exception {
// Virtual Destination Interceptor
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setName("VirtualTopic.>");
virtualTopic.setPrefix("Consumer.*.");
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[] {virtualTopic});
// Create and Start the Broker
BrokerService broker = new BrokerService();
broker.setDestinationInterceptors(new DestinationInterceptor[] {virtualDestinationInterceptor});
broker.addConnector("tcp://localhost:61616");
broker.start();
}
This above should enable virtual destinations and allow me to send to a topic VirtualTopic.myTopicName and receive from Consumer.A.VirtualTopic.myTopicName. (Note this replaces the activemq.xml that I do not have.)
From my code I'm sending with:
jmsTopicTemplate.send("VirtualTopic.myTopicName", session -> session.createTextMessage(jmsEvent));
And receiving with:
@JmsListener(destination = "Consumer.A.VirtualTopic.myTopicName", containerFactory = "jmsTopicListenerContainerFactory")
But I'm not receiving the message.
Through jconsole I see that the message has been enqueued to VirtualTopic.myTopicName but Consumer.A.VirtualTopic.myTopicName has a 0 count on dequeueing.
If changing the @JmsListener to:
@JmsListener(destination = "VirtualTopic.myTopicName", containerFactory = "jmsTopicListenerContainerFactory")
Then I'm receiving 5 messages (as I have 5 consumers).
Does anybody have any suggestion? It really sounds like a configuration problem to me.
For completeness, those are my jmsTopicTemplate and jmsTopicListenerContainerFactory
@Bean
public JmsTemplate jmsTopicTemplate(@Qualifier("activeMQConnectionFactory") ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setPubSubDomain(true);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(@Qualifier("activeMQConnectionFactory") ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("4-5");
factory.setPubSubDomain(true);
return factory;
}
Upvotes: 0
Views: 1550
Reputation: 1566
Oh! I got the error.
The error is in the @JmsListenerContainerFactory:
It should not anymore listen from a topic as Virtual Destinations are effectively Queues. I needed to remove factory.setPubSubDomain(true); and with the following it works.
@Bean
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(@Qualifier("activeMQConnectionFactory") ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("4-5");
return factory;
}
Furthermore ActiveMQ documentation says that virtual destinations are active by default and it is true.
Therefore the code for starting ActiveMQ can be simplified to:
public static void main(String[] args) throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.start();
}
Upvotes: 3