daemon_nio
daemon_nio

Reputation: 1566

Start ActiveMQ from java with virtual consumer enabled (virtual consumer not working)

I'm trying to start ActiveMQ from java enabling virtual destinations but I think I have the configuration wrong.

public static void main(String[] args) throws Exception {
    // Virtual Destination Interceptor
    VirtualTopic virtualTopic = new VirtualTopic();
    virtualTopic.setName("VirtualTopic.>");
    virtualTopic.setPrefix("Consumer.*.");
    VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
    virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[] {virtualTopic});

    // Create and Start the Broker
    BrokerService broker = new BrokerService();
    broker.setDestinationInterceptors(new DestinationInterceptor[] {virtualDestinationInterceptor});
    broker.addConnector("tcp://localhost:61616");
    broker.start();
}

This above should enable virtual destinations and allow me to send to a topic VirtualTopic.myTopicName and receive from Consumer.A.VirtualTopic.myTopicName. (Note this replaces the activemq.xml that I do not have.)


From my code I'm sending with:

jmsTopicTemplate.send("VirtualTopic.myTopicName", session -> session.createTextMessage(jmsEvent));

And receiving with:

@JmsListener(destination = "Consumer.A.VirtualTopic.myTopicName", containerFactory = "jmsTopicListenerContainerFactory")

But I'm not receiving the message.

Through jconsole I see that the message has been enqueued to VirtualTopic.myTopicName but Consumer.A.VirtualTopic.myTopicName has a 0 count on dequeueing.

If changing the @JmsListener to:

@JmsListener(destination = "VirtualTopic.myTopicName", containerFactory = "jmsTopicListenerContainerFactory")

Then I'm receiving 5 messages (as I have 5 consumers).


Does anybody have any suggestion? It really sounds like a configuration problem to me.



For completeness, those are my jmsTopicTemplate and jmsTopicListenerContainerFactory

@Bean
public JmsTemplate jmsTopicTemplate(@Qualifier("activeMQConnectionFactory") ConnectionFactory connectionFactory) {
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
    jmsTemplate.setPubSubDomain(true);
    return jmsTemplate;
}

@Bean
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(@Qualifier("activeMQConnectionFactory") ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrency("4-5");
    factory.setPubSubDomain(true);
    return factory;
}

Upvotes: 0

Views: 1550

Answers (1)

daemon_nio
daemon_nio

Reputation: 1566

Oh! I got the error.

The error is in the @JmsListenerContainerFactory:

It should not anymore listen from a topic as Virtual Destinations are effectively Queues. I needed to remove factory.setPubSubDomain(true); and with the following it works.

@Bean
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(@Qualifier("activeMQConnectionFactory") ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrency("4-5");
    return factory;
}



Furthermore ActiveMQ documentation says that virtual destinations are active by default and it is true.

Therefore the code for starting ActiveMQ can be simplified to:

public static void main(String[] args) throws Exception {
    BrokerService broker = new BrokerService();
    broker.addConnector("tcp://localhost:61616");
    broker.start();
}

Upvotes: 3

Related Questions