Neer1009
Neer1009

Reputation: 314

Camel Route is running infinitely to move JMS message

I am trying to move the message from a queue1(Dead Letter queue) to queue2 in active MQ at periodic interval of 5 minutes using Camel router. I am using below code to achieve this :-

    public class MessageRouteBuilder extends RouteBuilder {

    private static final Logger LOG =
            LoggerFactory.getLogger(MessageRouteBuilder.class);

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.camel.builder.RouteBuilder#configure()
     */
    @Override
    public void configure() throws Exception {
        LOG.info("Routing of camel is started");
        CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
        startPolicy.setRouteStartTime("0 0/5 * * * ?");

        from(
            "jms:queue:DLQ.Consumer.OUTDOCS.VirtualTopic.queue1")
                .routeId("DLQMessageMoverID").routePolicy(startPolicy)
                .noAutoStartup()
                .to("jms:queue:Consumer.OUTDOCS.VirtualTopic.queue1");
        LOG.info("Routing of camel is done");

    }

}


@Startup
@Singleton
public class ScheduledMessageDLQConsumer {

    @Inject
    private MessagingUtil msgUtil;

    @Inject
    private MessageRouteBuilder builder;

    private static final Logger LOG =
            LoggerFactory.getLogger(ScheduledMessageDLQConsumer.class);
    @PostConstruct
    public void init() {
        LOG.info("camel Scheduling scheduled started");
        CamelContext camelContext = new DefaultCamelContext();
        ConnectionFactory connectionFactory = msgUtil.getAMQConnectionFactory();
        camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

        try {
            camelContext.addRoutes(builder);
            camelContext.start();
            LOG.info("Camel scheduling completed");
        } catch (Exception e) {
            // TODO Auto-generated catch block

            LOG.error("Error in registering camel route builder", e);
        }

        LOG.info(" camel Scheduling scheduled completed");
    }

}

Problem here is that:- Camel routing gets enabled after 5 minutes. It moves the message from DLQ (DLQ.Consumer.OUTDOCS.VirtualTopic.queue1) to queue1 (Consumer.OUTDOCS.VirtualTopic.queue1). But if message is poison , it again comes back to DLQ and again routing moves the message from DLQ to normal queue and this process keeps on running infinitely.

My requirement is that routing should move the message once only from DLQ to queue after every 5 minutes ? if poison message comes, it should check after 5 minutes only.

Upvotes: 1

Views: 1065

Answers (1)

Bedla
Bedla

Reputation: 4939

Firstly, your whole idea looks as bad design. Reprocessing and redelivery should be handled on consumer or broker, without any obscure periodical "DLQMessageMover". If you have application consuming from OUTDOCS.VirtualTopic.queue1 under control, rethink your concepts of error handling.

BTW simple combination of maximumRedeliveries=-1 and redeliveryDelay=300000 on consumer connection would have same affect as all code, you have written in this question.

Secondly, you need idempotent consumer with correlation key on header with name JMSCorrelationID. This process every correlation id only once. When using MemoryIdempotentRepository, it gets cleared on route restart, so message is processed again, which fits to your requirement.

I have created small example, to show how it works. In your case, there will be no mocking of JMSCorrelationID header and jms component instead of timer.

public class IdempotentConsumerRouteBuilder extends RouteBuilder {
private final IdempotentRepository idempotentRepository = new MemoryIdempotentRepository();
private final List<String> mockCorrelationIds = Arrays.asList("id0","id0","id0","id1","id2","id0","id4","id0","id6","id7");

public void configure() {
    CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
    startPolicy.setRouteStopTime("0 0/5 * * * ?");
    startPolicy.setRouteStartTime("0 0/5 * * * ?");

    from("timer:jms?period=100")
            .routePolicy(startPolicy)
            .process(e -> e.getIn().setHeader(
                    "JMSCorrelationID", //Mock JMSCorrelationID to work with timer as it is jms component
                    mockCorrelationIds.get(e.getProperty("CamelTimerCounter", Integer.class)%10))
            )
            .idempotentConsumer(header("JMSCorrelationID"), idempotentRepository)
            .log("correlationId is ${header.JMSCorrelationID}")
            .to(("log:done?level=OFF"))
            .end();

}}

And output of this code:

[artzScheduler-camel-1_Worker-3] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id0
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id1
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id2
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id4
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id6
[mel-1) thread #4 - timer://jms] route1                         INFO  correlationId is id7
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)
[el-1) thread #5 - ShutdownTask] DefaultShutdownStrategy        INFO  Route: route1 shutdown complete, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy        INFO  Graceful shutdown of 1 routes completed in 0 seconds
[artzScheduler-camel-1_Worker-6] DefaultCamelContext            INFO  Route: route1 is stopped, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-8] ScheduledRoutePolicy           WARN  Route is not in a started/suspended state and cannot be stopped. The current route state is Stopped
[artzScheduler-camel-1_Worker-7] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id0
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id1
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id2
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id4
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id6
[mel-1) thread #6 - timer://jms] route1                         INFO  correlationId is id7
[rtzScheduler-camel-1_Worker-10] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)

Upvotes: 1

Related Questions