denis
denis

Reputation: 87

Camel SQS unmarshal

I have very simple camel router, which is using SQS and base64 decoding:

 from("aws-sqs://test-camel-start?amazonSQSClient=#sqsClient&concurrentConsumers=2&maxMessagesPerPoll=1")
            .unmarshal()
            .base64()
            .process(new BaseProcessor())
            .to("aws-sqs://test-camel-success?amazonSQSClient=#sqsClient").end();

My test Processor:

 public boolean process(Exchange exchange, AsyncCallback callback) {
    try {
        String header = (String) exchange.getIn().getHeader("CamelAwsSqsMessageId");
        String message = exchange.getIn().getBody(String.class);
        MDC.put("message-id", header);
        Thread.sleep(5 * 1000);
        LOG.info("Async ping");
        LOG.info(message);
    } catch (Throwable e) {
        LOG.error(e.getMessage(), e);
        exchange.setException(e);
    }
    callback.done(false);
    return false;
}

So I have the next situation: 1) Camel successfully read message from "test-camel-start" queue 2) BaseProcessor successfully process message 3) And failed during retrying delivery message to "test-camel-success"

I've received the next log trace:

637435 [Camel (HomeCamelSQS) thread #2 - aws-sqs://test-camel-start] ERROR o.a.c.processor.DefaultErrorHandler cr.id=18b85e94-bc2f-44dd-baef-4bcd4fb79e0c - Failed delivery for (MessageId: ID-DESKTOP-5SBC4FA-1522239559145-0-6 on ExchangeId: ID-DESKTOP-5SBC4FA-1522239559145-0-5). Exhausted after delivery attempt: 1 caught: com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageBody. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: 85604fc2-0724-5d16-af6b-1cf78a24e8b0)

Message History

RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ] [aws-sqs://test-camel-start?amazonSQSClient=%23sqsClient&concurrentConsumers=2&] [ 7591] [route1 ] [unmarshal1 ] [unmarshal[org.apache.camel.model.dataformat.Base64DataFormat@237080aa] ] [ 1] [route1 ] [process1 ] [Processor@0x5007bde9 ] [ 7417] [route1 ] [to1 ] [aws-sqs://test-camel-success?amazonSQSClient=#sqsClient ] [ 172]

Stacktrace

com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageBody. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: 85604fc2-0724-5d16-af6b-1cf78a24e8b0) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1638) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1303) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2013) at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1989) at com.amazonaws.services.sqs.AmazonSQSClient.executeSendMessage(AmazonSQSClient.java:1594) at com.amazonaws.services.sqs.AmazonSQSClient.sendMessage(AmazonSQSClient.java:1571) at org.apache.camel.component.aws.sqs.SqsProducer.process(SqsProducer.java:62) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43) at org.apache.camel.processor.Pipeline$1.done(Pipeline.java:157) at org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:262) at org.apache.camel.processor.RedeliveryErrorHandler$2.done(RedeliveryErrorHandler.java:560) at com.home.camel_poligon.BaseProcessor.process(BaseProcessor.java:35) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.component.aws.sqs.SqsConsumer.processBatch(SqsConsumer.java:206) at org.apache.camel.component.aws.sqs.SqsConsumer.poll(SqsConsumer.java:111) at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174) at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Log says that "The request must contain the parameter MessageBody".

And I don't have idea why this Message Body is absent.

Upvotes: 3

Views: 723

Answers (1)

Claus Ibsen
Claus Ibsen

Reputation: 55750

See this FAQ: http://camel.apache.org/why-is-my-message-body-empty.html

In your processor you convert the message body to a string which is logged. As the message body is streaming based (eg the output from the base64) it will read the body to end of stream, and then the message will be seen as empty.

You can turn on stream caching or dont do this manual logging in your processor.

Upvotes: 3

Related Questions