R2L2
R2L2

Reputation: 43

How Camel 2.11 batch aggregation works with separate route?

First there is a similar unanswered question Joining routes into single aggregator

We have some consumer routes (ftp, file, smb) reading files from remote systems. Simplified for test with direct route, but similar behavior with batch consumers:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

After transformation all results from one poll are aggregated by batch in a separate route:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

All works fine, if every consumer runs separated. But if multiple consumers runs in parallel, aggregation will split the polls. Example if file-consumer polls 500 messages and a second route starts to read 6 files from ftp the expections is that we get 2 aggregates 1 with 500 messages from file and 1 with 6 messages from ftp.

Testcase:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

The result is: "A+A", "B", "A", "B", "A" and not the expected "A+A+A", "B+B", "A", "Z". Questions:

  1. Is our assumption about aggregation wrong?
  2. How can we achieve the expected behavior?
  3. If we set completionTimeout, it seams that timeout will occur from first exchange - independent if there are still new exchanges?

Upvotes: 4

Views: 3632

Answers (1)

vikingsteve
vikingsteve

Reputation: 40408

You almost have it working. Here is the change you need (and after I will explain).

from("direct:aggregate").id("aggregate")
    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .completionSize(property(Exchange.BATCH_SIZE))
    .to("log:result", "mock:result")

The result will be:

Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

Note: You won't receive a result for the "Z" since the batch size is 7.

To explain - as you have read, the Aggregator is a versatile camel component and the key things to define correctly are:

  • the aggregation expression
  • the completion rule

Now in your case you are aggregating on a property AGGREGATION_PROPERTY which will be A, B or Z. In addition you are specifying a batch size.

However you aren't expressing a completionSize() in your route. Instead you were using completionFromBatchConsumer - which does something different (the code states that it looks for a Exchange#BATCH_COMPLETE property), thus the weird results.

Anyway, .completionSize(Exchange.BATCH_SIZE) will make your test run as desired.

Good luck further.

Upvotes: 1

Related Questions