whomer
whomer

Reputation: 615

Camel Kafka serialization error

I am new to camel and kafka. I am using Camel 2.18.2 with Kafka 0.10.1.1

I am getting this error and don't understand why:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

I have the following route:

from("direct://toEnrichEmail") 
.routeId(routeId).marshal().json(JsonLibrary.Jackson, Map.class) 
.log(LoggingLevel.INFO, "Sending to Kafka: ${body}") 
.to("kafka:localhost:9092?topic=enrich-email&requestRequiredAcks=-1"); 

The code that actually sends to the route:

ProducerTemplate template = kafkaProducerFactory.getProducerTemplate(); 
            logger.debug("Sending message type: {}, to uri: {}, route: {}", wimsConfiguration.getMessageType(), 
                            wimsConfiguration.getDirectUri(), wimsConfiguration.getRouteName()); 
            Map<String,Object>headers = new HashMap<>(); // added because the examples do 
            headers.put(KafkaConstants.PARTITION_KEY, 0); 
            headers.put(KafkaConstants.KEY, "1"); 
            template.sendBodyAndHeaders(wimsConfiguration.getDirectUri(), wimsConfiguration.getWimsMessage(), headers); 

The log message in the route shows that the message is a proper JSON string, the default serializer is string, So why is it complaining that it cannot serialize?

I looked through the camel kafka component test cases and this looks like it should work

Here is a sample of the output from Camel/kafka logs:

16:32:20,967 INFO [toEnrichEmail] (default task-12) Sending to Kafka: {"messageType":"orderCreate","regionCode":"IN","regionLanguage":["en"],"orderHeader":{"order":"3001357952","salesOrg":"2123","soldTo":"2035266752","currency":"INR","documentType":"TA","validFrom":null,"validTo":null,"contactName":"UI IN","contactPhone":"","contactEmail":"[email protected]"},"shipTo":{"customerNumber":"2035266752","companyName":"INTERNET PRICE FOR EOU","cityName":"Bangalore","district":"","postalCode":"","streetName":"","houseNumber":"","building":"","floor":"","roomNumber":"","countryName":"India","regionName":"Karnataka"},"billTo":{"customerNumber":"2035266752","companyName":"INTERNET PRICE FOR EOU","cityName":"Indi","district":"","postalCode":"","streetName":"","houseNumber":"","building":"","floor":"","roomNumber":"","countryName":"India","regionName":"Karnataka"},"paymentInfo":{"paymentMethod":"PO","purchaseOrder":"1234567","requisitionNumber":"","creditCardNumber":"","paymentterms":"","incoterms":""},"orderSummary":{"orderSubtotal":"6479.52","discounts":"0","shippingTransportation":"0","salesTax":"1187.92","dutyTotal":"1712.56","orderTotal":"9380"},"orderItems":[{"quoteNumber":null,"quoteItemNumber":"000010","totalReservedQunatity":null,"remainingReservedQuantity":null,"lineItemNo":"000010","itemCategory":"TAN","qty":"1","material":"T6066-1KG","product":"T6066","brandId":"SIGMA","description":"TRIZMA(R) BASE, BIOPERFORMANCE CERTIF&","yourRef":"","yourPrice":"8192.08","listPrice":"8192.08"}]} 16:32:21,008 ERROR [org.apache.camel.processor.DefaultErrorHandler] (default task-12) Failed delivery for (MessageId: ID-STLDEEPX06-sial-com-49604-1486506685312-1-1 on ExchangeId: ID-STLDEEPX06-sial-com-49604-1486506685312-1-2). Exhausted after delivery attempt: 1 caught: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Blockquote

Message History

RouteId ProcessorId Processor Elapsed (ms) [toEnrichEmail ] [toEnrichEmail ] [direct://toEnrichEmail ] [ 57] [toEnrichEmail ] [marshal1 ] [marshal[org.apache.camel.model.dataformat.JsonDataFormat@142a2fec] ] [ 10] [toEnrichEmail ] [log4 ] [log ] [ 15] [toEnrichEmail ] [to3 ] [kafka:localhost:9092?topic=enrich-email&requestRequiredAcks=-1 ] [ 25]

Stacktrace ---------------------------------------------------------------------------------------------------------------------------------------: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

16:32:21,011 INFO [stdout] (default task-12) 2017-02-07T16:32:21,010 INFO com.sial.notifications.common.rest.NotificationExceptionResponseMapper - Mapping Exception org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-STLDEEPX06-sial-com-49604-1486506685312-1-2] 16:32:21,021 INFO [stdout] (default task-12) 2017-02-07T16:32:21,014 ERROR com.sial.notifications.common.rest.NotificationExceptionResponseMapper - Mapped exception org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-STLDEEPX06-sial-com-49604-1486506685312-1-2] 16:32:21,021 INFO [stdout] (default task-12) org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-STLDEEPX06-sial-com-49604-1486506685312-1-2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1779) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.util.ExchangeHelper.extractResultBody(ExchangeHelper.java:677) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:515) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:511) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.sendBodyAndHeaders(DefaultProducerTemplate.java:259) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.sendBodyAndHeaders(DefaultProducerTemplate.java:253) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at com.sial.notifications.messages.rest.NotificationMessageServiceImpl.submitMessageToRoute(NotificationMessageServiceImpl.java:66) ~[classes:?]

Upvotes: 1

Views: 3646

Answers (2)

whomer
whomer

Reputation: 615

The solution was to add .convertBodyTo(String.class) after marshaling the class to JSON. That shouldn't be necessary, but doing this made Kafka happy.

Upvotes: 1

so-random-dude
so-random-dude

Reputation: 16465

add serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer to the producer config

Or

add .convertBodyTo(String.class) after marshaling the object to JSON as you said

I would prefer the former one as it's one less operation.

Upvotes: 2

Related Questions