Alessandro Da Rugna
Alessandro Da Rugna

Reputation: 4695

Camel ActiveMQ how to destroy/purge a virtual topic queue without consumers

My application consists of many OSGi bundles running inside JBoss Fuse 6.2.1. Each bundle has a Camel route consuming from an ActiveMQ endpoint. Data is exchanged using VirtualTopics.

ProducerBundle publishes to topic VirtualTopic.MyTopic

ConsumerBundle A consumes from queue Consumer.A.VirtualTopic.MyTopic
ConsumerBundle B consumes from queue Consumer.B.VirtualTopic.MyTopic
ConsumerBundle C consumes from queue Consumer.C.VirtualTopic.MyTopic

At a certain moment in time Consumer C is closed, its bundle uninstalled and will never come back. Howewer, messages are still enqueued into Consumer.C.VirtualTopic.MyTopic queue.
How do I destroy such queue?

ActiveMQ pauses the Producer when the queue fills up, and I cannot set a small time to live on the message as other consumers may take a while to process each message. I cannot modify the VirtualTopic structure. I have full access to ActiveMQ configuration and Camel routes.
Are there any other options to handle the situation?

<!-- producer route -->
<route id="ProducerRoute"/>
    <from uri="direct:trigger"/>
    <to uri="activemq:topic:VirtualTopic.MyTopic"/>
</route>

<!-- each consumer route -->
<route id="ConsumerARoute">
    <from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
    <to uri="bean:myProcessor"/>
</route>

Upvotes: 1

Views: 2467

Answers (2)

Agostino Leoni
Agostino Leoni

Reputation: 3

I had a similar situation, but I couldn't use the Claus's suggestion because in my broker there are other queues that have no consumer and I doesn't want to delete them. In my case I'm running a JBoss Fuse 6.1.0 with fabric (I think that is the same with newer version of Fuse): I just removed the consumer (in my case I just removed the profile with the consumer) and after that I deleted the queue with the delete button in the hawtio console.

Upvotes: -1

Alessandro Da Rugna
Alessandro Da Rugna

Reputation: 4695

I went for the aggressive solution: I hook into OSGi bundle lifecycle, when it is stopped I use JMX MBeanServer to destroy the now unneeded queues.

Since my bundle is managed using blueprint, I opted for a bean with a destroy method.
Here's an example implementation:

My bean

package org.darugna.osgi;

import javax.management.MBeanServer;
import javax.management.ObjectName;

public class QueueDestroyer {

    private static final String[] QUEUES_TO_DESTROY = {
        "Consumer.A.VirtualTopic.MyTopic"
    };

    private MBeanServer mBeanServer;

    public void setMbeanServer(MBeanServer mBeanServer) {
        this.mBeanServer = mBeanServer;
    }

    public void destroy() throws Exception {
        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq");
        for (String queueName : QUEUES_TO_DESTROY) {
            Object returnValue = mBeanServer.invoke(brokerName,
                    "removeQueue",
                    new Object[]{queueName},
                    new String[]{String.class.getName()});
        }
    }

}

Blueprint.xml

<blueprint>

    <reference id="mbeanServer" interface="javax.management.MBeanServer" 
               availability="mandatory"/>

    <bean id="queueDestroyer" class="org.darugna.osgi.QueueDestroyer" 
          destroy-method="destroy">
        <property name="mbeanServer" ref="mbeanServer"/>
    </bean>

    <camelContext>
        <route>
            <from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
            <to uri="bean:myProcessor"/>
        </route>
    <camelContext>

</blueprint>    

Upvotes: 0

Related Questions