Screwtape
Screwtape

Reputation: 1367

Why does my ConsumerTemplate not read any messages from ActiveMQ Topic?

I'm trying to consume messages as a polling consumer, from time to time, from an ActiveMQ topic using a durable subscriber endpoint.

In my bean, I have a ConsumerTemplate from which I am trying to receive exchanges, and send to another URI.

The bean method is:

public void pollConsumer() throws Exception {
    long count = 0;
    try {
    if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
    logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
    consumer.start();
    producer.start();
    while ( true ) {
        logger.trace("Awaiting message: " + ++count );
        Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
        if ( exchange == null ) break;
        logger.trace("Processing message: " + count );
        producer.send( exchange );
        consumer.doneUoW( exchange );
        logger.trace("Processed message: " + count );
    }
    producer.stop();
    consumer.stop();
    } catch ( Throwable t ) {
        logger.error("Something went wrong!", t );
        throw t;
    }
}

When called, the logger displays the "Consuming" message in the form

activemq://topic:fromQueue.Name?clientId=MyClient&durableSubscriptionName=MyClient&selector=RecordType+IN+%28+%271%27%2C+%272%27+%29+AND+SubType+%3D+%272%27

which is correct as far as I can see (the selector should read RecordType IN ('1', '2') AND SubType = '2' without the URL encoding.

I get a single "Awaiting message" log, and nothing else, so it appears that nothing is retrieved.

Bizarrely, it also doesn't register on ActiveMQ as a durable subscriber, so it appears that it isn't doing anything at all, but it's not registering any errors either, so I'm rather baffled.

Can anyone suggest why this might not be working, or at least where I should start looking?

Upvotes: 1

Views: 533

Answers (2)

Screwtape
Screwtape

Reputation: 1367

Having noted @pcoates answer, and tried extending the timeout for testing purposes, it became obvious that the issue was that the durable subscription option on the URI wasn't being acted on, and as there were no new messages on the topic during the 1 second wait, nothing happened.

The answer to another question relating to durable subscriptions explains that you can't use a durable subscription from a polling consumer.

My workaround therefore to subscribe to the topic and route message to a new queue, and to have the polling consumer on this new queue. It's not great, since I'd rather not have an additional queue, but it works and is less effort than writing a new version of JMSPollingConsumer.

Upvotes: 0

pcoates
pcoates

Reputation: 2307

Your pollConsumer will stop if it has to wait more than a second for a message to be on the queue/topic.

It waits 1 second for a message, after which it returns null and will break out of the while loop and stop the consumer.

    Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
==> if ( exchange == null ) break;
    logger.trace("Processing message: " + count );
    producer.send( exchange );
    consumer.doneUoW( exchange );

It would be easier just to use an apache-camel route to do what you describe.

Upvotes: 1

Related Questions