grinch
grinch

Reputation: 814

How to convert message body to pojo in AggregationStrategy

I've got a route (camel 2.17.3) that uses the enrich DSL to call into a rest service and aggregate results into the message body. I'm running into problems with serialization, though. Here is what I'm trying. My route looks like this:

rest("myService").produces("application/json")
            .get("test")
            .param().name("text").required(true).type(RestParamType.query).description("The text to be processed.").endParam()
                .to("direct:StepA")
    ;

    from("direct:StepA")
            .removeHeader("CamelHttpQuery")
            .removeHeader("CamelHttpRawQuery")
            .enrich("netty4-http://myOtherService:8080/endpoint?input=${header.text}", new MyAggregator())
            .to("direct:StepB")
    ;

    from("direct:StepB")
            // not really implemented yet
            .transform().simple("Query: ${header.text}\nBody: ${body}")
    ;

As you see, I want to use the enrich() DSL to call out to some existing service and aggregate those results in to form a new message body for further processing. I need to transform the result of that rest call from Json to MyResponse. I'd like to use this:

.unmarshal().json(JsonLibrary.Jackson, MyResponse.class)

But I need it already unmarshalled within my AggregationStrategy, which looks like this:

public class MyAggregator implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        final MyResponse intermediateResponse = oldExchange.getIn().getBody(MyResponse.class);

        final MyAggregate response = new MyAggregate(oldExchange.getIn().getHeader("text", String.class), intermediateResponse);

        newExchange.getIn().setBody(response);
        return newExchange;
}

This returns null, however, when I say getBody(..). So I learned about TypeConverters and tried to use that so the system could automagically convert to my type:

public class MyConverter extends TypeConverterSupport {

    private static final ObjectMapper MAPPER =
        new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);

    @Override
    public <T> T convertTo(Class<T> type, Exchange exchange, Object value) throws TypeConversionException {
        try {
            return (T)MAPPER.readValue(value.toString(), MyResponse.class);
        } catch (IOException e) {
           throw new TypeConversionException(value, type, e);
        }
}

And then in my main:

    public static void main(String... args) throws Exception {
        Main main = new Main();
        main.setApplicationContextUri("classpath:app-ctx.xml");

        for (final CamelContext ctx : main.getCamelContexts()) {
           ctx.getTypeConverterRegistry().addTypeConverter(MyResponse.class, String.class, new MyConverter());
        }

        main.run(args);
    }

But this doesn't appear to change the result. In my Aggregator the MyResponse is still null. I'm missing something on how I need to set up this type conversion and hoping someone could point me in the right direction.

Upvotes: 0

Views: 2131

Answers (1)

grinch
grinch

Reputation: 814

Ok I ended up working through this when I realized that I could just enrich a sub-route that did the unmarshall call, and then aggregate that ...

from("direct:StepA")
        .removeHeader("CamelHttpQuery")
        .removeHeader("CamelHttpRawQuery")
        .enrich("direct:StepAEnricher", new MyAggregator())
        .to("direct:StepB")
;

from("direct:StepAEnricher")
    .toD("netty4-http://myOtherService:8080/endpoint?input=${header.text}")
    .unmarshal().json(JsonLibrary.Jackson, MyResponse.class)
;

This works perfectly! I am curious how a more seasoned camel developer would think about this. Is it costly to have this "extra" route to just handle unmarshalling a response like this? I'm still curious how I would have overcome the custom type converter approach.

Upvotes: 0

Related Questions