Reputation: 4695
My application consists of many OSGi bundles running inside JBoss Fuse 6.2.1. Each bundle has a Camel route consuming from an ActiveMQ endpoint. Data is exchanged using VirtualTopics.
ProducerBundle publishes to topic VirtualTopic.MyTopic
ConsumerBundle A consumes from queue Consumer.A.VirtualTopic.MyTopic
ConsumerBundle B consumes from queue Consumer.B.VirtualTopic.MyTopic
ConsumerBundle C consumes from queue Consumer.C.VirtualTopic.MyTopic
At a certain moment in time Consumer C is closed, its bundle uninstalled and will never come back. Howewer, messages are still enqueued into Consumer.C.VirtualTopic.MyTopic
queue.
How do I destroy such queue?
ActiveMQ pauses the Producer when the queue fills up, and I cannot set a small time to live on the message as other consumers may take a while to process each message. I cannot modify the VirtualTopic structure. I have full access to
ActiveMQ configuration and Camel routes.
Are there any other options to handle the situation?
<!-- producer route -->
<route id="ProducerRoute"/>
<from uri="direct:trigger"/>
<to uri="activemq:topic:VirtualTopic.MyTopic"/>
</route>
<!-- each consumer route -->
<route id="ConsumerARoute">
<from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
<to uri="bean:myProcessor"/>
</route>
Upvotes: 1
Views: 2467
Reputation: 3
I had a similar situation, but I couldn't use the Claus's suggestion because in my broker there are other queues that have no consumer and I doesn't want to delete them. In my case I'm running a JBoss Fuse 6.1.0 with fabric (I think that is the same with newer version of Fuse): I just removed the consumer (in my case I just removed the profile with the consumer) and after that I deleted the queue with the delete button in the hawtio console.
Upvotes: -1
Reputation: 4695
I went for the aggressive solution: I hook into OSGi bundle lifecycle, when it is stopped I use JMX MBeanServer to destroy the now unneeded queues.
Since my bundle is managed using blueprint, I opted for a bean with a destroy method.
Here's an example implementation:
My bean
package org.darugna.osgi;
import javax.management.MBeanServer;
import javax.management.ObjectName;
public class QueueDestroyer {
private static final String[] QUEUES_TO_DESTROY = {
"Consumer.A.VirtualTopic.MyTopic"
};
private MBeanServer mBeanServer;
public void setMbeanServer(MBeanServer mBeanServer) {
this.mBeanServer = mBeanServer;
}
public void destroy() throws Exception {
ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq");
for (String queueName : QUEUES_TO_DESTROY) {
Object returnValue = mBeanServer.invoke(brokerName,
"removeQueue",
new Object[]{queueName},
new String[]{String.class.getName()});
}
}
}
Blueprint.xml
<blueprint>
<reference id="mbeanServer" interface="javax.management.MBeanServer"
availability="mandatory"/>
<bean id="queueDestroyer" class="org.darugna.osgi.QueueDestroyer"
destroy-method="destroy">
<property name="mbeanServer" ref="mbeanServer"/>
</bean>
<camelContext>
<route>
<from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
<to uri="bean:myProcessor"/>
</route>
<camelContext>
</blueprint>
Upvotes: 0