tuk
tuk

Reputation: 6872

Camel - Making parallel GET requests and aggregating the results via Dynamic Routes using Java DSL

I am receiving a request on a Jetty Http Endpoint. The request body contains some urls in the request body. I have to make a GET request to those urls. Then aggregate the results from each of the GET request and return it to the caller.

Request Body:-

{
    "data" : [
        {"name" : "Hello", "url" : "http://server1"}
        {"name" : "Hello2", "url" : "http://server2"}

    ]
}

One way I can think of doing this is like below:-

from("jetty:http://localhost:8888/hello").process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        // 1. Make the GET request in parallel using ThreadPoolExecutor
        // 2. Wait for all calls to finish. Collate the response
        // 3. Write it to exchange.getOut().setBody
    }
})

Can some one let me know if this can be achieved via Java DSL using camel Dynamic Routes, Splitter & Aggregator so that the Processor remains relatively small?

I am using camel 2.16.3.

Upvotes: 4

Views: 2975

Answers (2)

Claudius B
Claudius B

Reputation: 86

The answer of @Darius X. is pretty much what you need. To make the backend requests execute in parallel and to aggregate the response bodies in a string list you need to setup an aggregation strategy and set the parallel processing flag on the split definition.

@Override
public void configure() throws Exception {

    from("direct:testMultiple")
            .split(body(), new FlexibleAggregationStrategy<String>()
                    .pick(Builder.body())
                    .castAs(String.class)
                    .accumulateInCollection(ArrayList.class))
            .parallelProcessing()
            // .executorService(<instance of java.util.concurrent.ExecutorService>) // optional: use custom thread pool for parallel processing
            .to("direct:httpClient");

    from("direct:httpClient")
            .log("starting httpClient route")
            .setHeader(Exchange.HTTP_URI, simple("${body}"))
            .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
            .to("http4://google.com")
            .convertBodyTo(String.class)
            .log(LoggingLevel.INFO, "Output was ${body}");
}

The out message of the exchange returned by direct:testMultiple will contain your result array as body.

Upvotes: 3

Darius X.
Darius X.

Reputation: 2937

The steps would be:

  1. Split the incoming data into URLs. This might involve sub-steps: for instance, you might unmarshal the incoming JSON string into some POJO and perhaps this POJO has an array or list where each entry is a URL. Then, you could pass this along as the body. (Of course, you might need other information from the incoming request, so you might vary this.)
  2. The splitter would split easily enough if the body is a Array or something else that it can handle easily. In the simplest case, the splitter would pass on a single URI in the body of each split message.
  3. Next -- within the splitter's flow -- you could have a producer like http4, but instead of using the URI on the endpoint, you could ask it to use the URI in the message.
  4. Finally, you would have an aggregator.

Sounds like the core of your question is about the dynamic URI. This is how a code-snippet might look:

 from(...)... etc. 
    .setHeader(Exchange.HTTP_URI, simple("${body}"))
    .setHeader(Exchange.HTTP_METHOD,  
              constant(org.apache.camel.component.http4.HttpMethods.GET))
    .to("http4://google.com")

For a small work-demo, see this class.

public class HttpDynamicClient extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("direct:testMultiple")
        .split(body())
        .to("direct:httpClient");

        from("direct:httpClient")
        .log("starting httpClient route")
        .setHeader(Exchange.HTTP_URI, simple("${body}"))
        .setHeader(Exchange.HTTP_METHOD, constant(org.apache.camel.component.http4.HttpMethods.GET))
        .to("http4://google.com")
        .process(new BodyToStringConverter())
        .log(LoggingLevel.INFO, "Output was ${body}");
    }

    private static class BodyToStringConverter implements Processor {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(exchange.getIn().getBody(String.class));
        }
    }


    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        try {
            Logger logger = Logger.getLogger(HttpDynamicClient.class);

            context.addRoutes(new HttpDynamicClient());
            ProducerTemplate template = context.createProducerTemplate();
            context.start();
            Thread.sleep(1000);


            template.sendBody("direct:httpClient", "http://jsonplaceholder.typicode.com/posts/1");
            Thread.sleep(2000);

            template.sendBody("direct:testMultiple", new String [] {"http://jsonplaceholder.typicode.com/posts/1" , "http://jsonplaceholder.typicode.com/posts/1"});


        } finally {
            context.stop();
        }
    }

}

Upvotes: 2

Related Questions