techcraver
techcraver

Reputation: 411

Read from Paginated API in Apache camel Route

How to read from a Paginated REST API Endpoint or from a JDBC SQL Query that fetches "K" Items/Records at a time using Apache Camel DSL? Appreciate if there is a clean example for the same.

Thanks in advance.

Upvotes: 2

Views: 3455

Answers (2)

Lorenzo Gardoni
Lorenzo Gardoni

Reputation: 31

More simple way with DSL

                .loopDoWhile(exchangeProperty(PropertyUtilsEnum.IS_LAST.getLabel()).isEqualTo(false))
                .log("Loop page: " + PropertyUtilsEnum.PAGE.getPropertyValue())
                .to(direct(GET_PAGE_MS))
                .setProperty(PropertyUtilsEnum.IS_LAST.getLabel()).jsonpath("$.last", true)
                .setProperty(PropertyUtilsEnum.PAGE.getLabel()).jsonpath("$.pageable.page", true)
                .process(this::incrementPage)
            .end() //loop

Upvotes: 0

techcraver
techcraver

Reputation: 411

I have done this using loopDoWhile dsl:

from("direct:start").loopDoWhile(stopLoopPredicate())
                        .to("bean:restAPIProcessor")
                        .to("bean:dataEnricherBean")
                     .end();

The stopLoopPredicate() is here:

public Predicate stopLoopPredicate() {
        Predicate stopLoop = new Predicate() {
            @Override
            public boolean matches(Exchange exchange) {
                return exchange.getIn().getBody() != null && !exchange.getIn().getBody().toString().equalsIgnoreCase("stopLoop");
            }
        };
        return stopLoop;
    }

The restAPIProcessor is an implementation of Processor where the REST API call is made.

The logic to handle pagination is implemented in restAPIProcessor & the moment the actual REST API returns a null response "stopLoop" is set to the body of the exchange out route. This works pretty well. Here is the code for RestAPIProcessor:

public class RestAPIProcessor implements Processor {

    @Inject
    private RestTemplate restTemplate;

    private static final int LIMIT = 100;

    private static final String REST_API = "<REST API URL>";

    @Override
    public void process(Exchange exchange) throws Exception {
        Integer offset = (Integer) exchange.getIn().getHeader("offset");
        Integer count = (Integer) exchange.getIn().getHeader("count");

        if (offset == null) offset = 0;
        if (count == null) count = 0;

        String response = "";
        Map<String,Object> body = new LinkedHashMap<>();
        body.put("offset",offset++);
        body.put("limit",LIMIT);
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<?> entity = new HttpEntity<Object>(body,headers);
        ResponseEntity<String> countResponseEntity = restTemplate.exchange(REST_API, HttpMethod.POST,entity,String.class);
        response = countResponseEntity.getBody();
        count += LIMIT;
        if (response == null || response.isEmpty()) {
            exchange.getIn().setBody("stopLoop");
            exchange.getOut().setHeaders(exchange.getIn().getHeaders());
        } else {
            exchange.getIn().setHeader("count", count);
            exchange.getIn().setHeader("offset", offset);
            exchange.getOut().setHeaders(exchange.getIn().getHeaders());
            exchange.getOut().setBody(response);
        }
    }
}

Upvotes: 6

Related Questions