Andreas
Andreas

Reputation: 1193

After Camel split Exception won't be escalated

we defined a route in Camel and have to find out, if an Exception is thrown in a processor. When we only have one processor, Camel rethrows the Exception in the sendBody() Method. If there is a preceding split/aggregate the Exception won't be thrown. So the outcome of the example below is

before throwing Exception

after sendBody

If I omit everything from .split to .completionSize(1) the output is

before throwing Exception

Exception thrown

Any Ideas how to find out, if an Exception occured after a split?

private static final String DIRECT_START = "direct:start";

public static void main(String[] args) throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            from(DIRECT_START)
            .split(body())
                .aggregate(constant(true), new AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        return oldExchange == null ? newExchange : oldExchange;
                    }
                })
                .completionSize(1)
            .process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    System.out.println("before throwing Exception");
                    exchange.setException(new Exception());
                    throw new Exception("my Exception");
                }
            });
        }});        

    context.start();

    ProducerTemplate producer = context.createProducerTemplate();
    try {
        producer.sendBody(DIRECT_START, Integer.valueOf(42));
        System.out.println("after sendBody");
    } catch (Exception e) {
        System.out.println("Exception thrown");
    }

    context.stop();

}

For checking Exceptions afterwards we found a solution. We registered with onException() an ErrorProcessor which sets a status into the context properties.

But this doesn't interrupt producer.sendBody(..). We have extrem long running processors which we have to interrupt.

So the question is, can we configure Camel to throw an Exception in sendBody or is it possible to do this in the Exceptionhandler?

Upvotes: 2

Views: 6980

Answers (1)

AndyN
AndyN

Reputation: 2105

There's a good chapter on the Splitter EIP and exception handling in Camel in Action (section 8.3.5) that I highly recommend. The section explains that:

When using a custom AggregationStrategy with the Splitter, it's important to know that you're responsible for handling exceptions. If you don't propagate the exception back, the Splitter will assume you have handled the exception, and ignore it.

You've used the split() method without specifying an aggregator. In the Camel documentation, they specify that

The splitter will by default return the original input message

This means that the exchange leaving the split() method does not have an exception, and so no exception is propagated back to your calling code. The exception you throw from the processor is technically inside the splitter. Even though you've used an aggregator, it's not associated with the split call, and you haven't explicitly ended the split with an end(). So when your processor throws an exception, the splitter ignores it as you haven't provided an aggregator to handle and propagate the exception.

We can test this by passing your aggregation strategy to the split call as an argument like so:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .log("test") // inside the split/aggregator EIP
.end() // outside the split/aggregator EIP
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println("before throwing Exception");
        throw new Exception("my Exception");
    }
});

And you'll get the output:

test
Aggregating
before throwing Exception
Exception thrown

If you want the processor to be inside the split/aggregator EIP, like so:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    })
.end(); // outside the split/aggregator EIP

You'll get the output:

before throwing Exception
Aggregating
Exception thrown

Notice how, inside the split/aggregator EIP, the aggregator is run after the exception is thrown? This is important, as without an aggregator to pass on the exception, the splitter will ignore it. In order for this to work, you need to propagate your exceptions properly inside the aggregator. For instance, in your code, if the newExchange was to contain an exception, it would be ignored as you're not propagating it. You'd need to change the aggregator to add in:

if (newExchange.getException() != null) {
    oldExchange.setException(newExchange.getException());
}

Note: if you have an onException() call inside the split EIP, and you set the exception as being handled, it will no longer return when you call getException(). So if you want to handle your exceptions, but still propagate them through the aggregator, you can use exchange.getProperty(Exchange.EXCEPTION_CAUGHT);

You can also use .stopOnException(), like so:

.split(body()).stopOnException()
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    });

This causes the split to stop on an exception, and propagate it. However, when you place an aggregator after the stopOnException(), it no longer works. I'm not entirely sure why. I'm guessing it's because the aggregator alters the exchange object.

Also note, you don't need to set the exception in your processor to the exchange. Camel will do that for you when the processor throws the exception. So the line exchange.setException(new Exception()); in the processor isn't necessary.

tl;dr so yes, you can propagate an exception to the calling method from inside a split. You just need to make sure that it's done either through an aggregator associated with the split, or set stopOnException(). It depends on what you are trying to achieve with splitting/aggregating/processing what the best method is.

Upvotes: 18

Related Questions