audtou
audtou

Reputation: 11

Getting exact response from combined routes

I have a dynamic route creator web application. According to flow design, I produce a camel route. Route may contain multicast, filter, aggregate, processor etc. After designing flow through the UI, my route has been created like this:

from("seda:start").routeId("idx")
    .multicast()
      .to("direct:a", "direct:b", "direct:c")
      .parallelProcessing()
    .end();

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");

from("direct:merge")
    .aggregate(new MyAggregationStrategy()).constant(true).completionSize(3)
    .to("mock:end");

I have an API to give result of this route to the users. When I execute this route with InOut MEP, response is 'C' but mock:end is satisfied with 'ABC':

MockEndpoint mock = getMockEndpoint("mock:end");
    mock.expectedBodiesReceived("ABC");  //works as expected

    String reply = template.requestBody("seda:start", "", String.class);

    assertEquals("ABC", reply);   //it returns 'C', but I expect 'ABC'

    assertMockEndpointsSatisfied();

How can I change the code to get aggregated result with a synchronous call? Here is the code:

public class ResponseTest extends CamelTestSupport {

@Test
public void testAsyncInOut() throws Exception {
    MockEndpoint mock = getMockEndpoint("mock:end");
    mock.expectedBodiesReceived("ABC");  //works as expected

    String reply = template.requestBody("seda:start", "", String.class);

    assertEquals("ABC", reply);   //it returns 'C', but I expect 'ABC'

    assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("seda:start").routeId("idx")
                .multicast()
                    .to("direct:a", "direct:b", "direct:c")
                    .parallelProcessing()
                .end();

            from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
            from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
            from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");

            from("direct:merge")
                .aggregate(new MyAggregationStrategy()).constant(true).completionSize(3)
                .to("mock:end");
        }
    };
}

class MyAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            // this is the first time so no existing aggregated exchange
            return newExchange;
        }

        // append the new word to the existing
        String body = newExchange.getIn().getBody(String.class);
        String existing = oldExchange.getIn().getBody(String.class);

        oldExchange.getIn().setBody(existing + body);
        return oldExchange;
    }
}
}

EDIT

Multicasting message to 5 different endpoints doesn't mean that all the messages will be aggregated later. So I cannot use aggregate strategy in multicast definition. Some of them might be used for another kind of job. Flow definition might be like this: After multicast message to 'a','b','c','d','e' endpoints, 'a' and 'b' can be aggregated together('direct:merge1'), 'c' and 'd' aggregated together('direct:merge2') and 'e' can be used for another thing. And final aggregator will be aggregate 'direct:merge1' and 'direct:merge2' to 'direct:merge3'. All these endpoints are created dynamically('direct:a','direct:b','direct:c','direct:d','direct:e','direct:merge1','direct:merge2','direct:merge3'). This scenario will be created like this:

from("seda:start").routeId("idx")
    .multicast()
        .to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e")
        .parallelProcessing()
    .end();

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge1");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge1");

from("direct:c").transform(constant("C")).delay(3000).to("direct:merge2");
from("direct:d").transform(constant("D")).delay(1000).to("direct:merge2");

from("direct:e").transform(constant("E")).delay(1000).to("mock:anywhere");

from("direct:merge1").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3");
from("direct:merge2").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3");

from("direct:merge3").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("mock:end");

When I send message to seda:start, I expect ABDC but I got 'E'. Is there a way to get final aggregated message('ABDC')? Here is the test method:

@Test
public void testAsyncInOut() throws Exception {
    MockEndpoint mock = getMockEndpoint("mock:end");
    mock.expectedBodiesReceived("ABDC");  //works as expected

    String reply = template.requestBody("seda:start", "", String.class);

    assertEquals("ABDC", reply);   //it returns 'E' because of default multicast behavior, but I expect 'ABDC'

    assertMockEndpointsSatisfied();
}

Upvotes: 1

Views: 118

Answers (1)

anon
anon

Reputation:

From the multicast documentation:

By default Camel will use the last reply as the outgoing message.

If you want to aggregate the results of the multicast into a single message, you state that in the multicast definition.

@Override
public void configure() throws Exception {
  from("seda:start").routeId("idx")
    .multicast(new MyAggregationStrategy()) //Put the Aggregation Strategy here!
      .to("direct:a", "direct:b", "direct:c")
      .parallelProcessing()
    .end();

  from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");                      
  from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
  from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");

  from("direct:merge")
    .to("mock:end");
}

Note that your mock endpoint will now be called 3 times, as the aggregation doesn't occur until later. You would need to modify your test accordingly.

Upvotes: 1

Related Questions