Reputation: 615
I am new to camel and kafka. I am using Camel 2.18.2 with Kafka 0.10.1.1
I am getting this error and don't understand why:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
I have the following route:
from("direct://toEnrichEmail")
.routeId(routeId).marshal().json(JsonLibrary.Jackson, Map.class)
.log(LoggingLevel.INFO, "Sending to Kafka: ${body}")
.to("kafka:localhost:9092?topic=enrich-email&requestRequiredAcks=-1");
The code that actually sends to the route:
ProducerTemplate template = kafkaProducerFactory.getProducerTemplate();
logger.debug("Sending message type: {}, to uri: {}, route: {}", wimsConfiguration.getMessageType(),
wimsConfiguration.getDirectUri(), wimsConfiguration.getRouteName());
Map<String,Object>headers = new HashMap<>(); // added because the examples do
headers.put(KafkaConstants.PARTITION_KEY, 0);
headers.put(KafkaConstants.KEY, "1");
template.sendBodyAndHeaders(wimsConfiguration.getDirectUri(), wimsConfiguration.getWimsMessage(), headers);
The log message in the route shows that the message is a proper JSON string, the default serializer is string, So why is it complaining that it cannot serialize?
I looked through the camel kafka component test cases and this looks like it should work
Here is a sample of the output from Camel/kafka logs:
16:32:20,967 INFO [toEnrichEmail] (default task-12) Sending to Kafka: {"messageType":"orderCreate","regionCode":"IN","regionLanguage":["en"],"orderHeader":{"order":"3001357952","salesOrg":"2123","soldTo":"2035266752","currency":"INR","documentType":"TA","validFrom":null,"validTo":null,"contactName":"UI IN","contactPhone":"","contactEmail":"[email protected]"},"shipTo":{"customerNumber":"2035266752","companyName":"INTERNET PRICE FOR EOU","cityName":"Bangalore","district":"","postalCode":"","streetName":"","houseNumber":"","building":"","floor":"","roomNumber":"","countryName":"India","regionName":"Karnataka"},"billTo":{"customerNumber":"2035266752","companyName":"INTERNET PRICE FOR EOU","cityName":"Indi","district":"","postalCode":"","streetName":"","houseNumber":"","building":"","floor":"","roomNumber":"","countryName":"India","regionName":"Karnataka"},"paymentInfo":{"paymentMethod":"PO","purchaseOrder":"1234567","requisitionNumber":"","creditCardNumber":"","paymentterms":"","incoterms":""},"orderSummary":{"orderSubtotal":"6479.52","discounts":"0","shippingTransportation":"0","salesTax":"1187.92","dutyTotal":"1712.56","orderTotal":"9380"},"orderItems":[{"quoteNumber":null,"quoteItemNumber":"000010","totalReservedQunatity":null,"remainingReservedQuantity":null,"lineItemNo":"000010","itemCategory":"TAN","qty":"1","material":"T6066-1KG","product":"T6066","brandId":"SIGMA","description":"TRIZMA(R) BASE, BIOPERFORMANCE CERTIF&","yourRef":"","yourPrice":"8192.08","listPrice":"8192.08"}]} 16:32:21,008 ERROR [org.apache.camel.processor.DefaultErrorHandler] (default task-12) Failed delivery for (MessageId: ID-STLDEEPX06-sial-com-49604-1486506685312-1-1 on ExchangeId: ID-STLDEEPX06-sial-com-49604-1486506685312-1-2). Exhausted after delivery attempt: 1 caught: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Blockquote
RouteId ProcessorId Processor Elapsed (ms) [toEnrichEmail ] [toEnrichEmail ] [direct://toEnrichEmail ] [ 57] [toEnrichEmail ] [marshal1 ] [marshal[org.apache.camel.model.dataformat.JsonDataFormat@142a2fec] ] [ 10] [toEnrichEmail ] [log4 ] [log ] [ 15] [toEnrichEmail ] [to3 ] [kafka:localhost:9092?topic=enrich-email&requestRequiredAcks=-1 ] [ 25]
Stacktrace ---------------------------------------------------------------------------------------------------------------------------------------: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
16:32:21,011 INFO [stdout] (default task-12) 2017-02-07T16:32:21,010 INFO com.sial.notifications.common.rest.NotificationExceptionResponseMapper - Mapping Exception org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-STLDEEPX06-sial-com-49604-1486506685312-1-2] 16:32:21,021 INFO [stdout] (default task-12) 2017-02-07T16:32:21,014 ERROR com.sial.notifications.common.rest.NotificationExceptionResponseMapper - Mapped exception org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-STLDEEPX06-sial-com-49604-1486506685312-1-2] 16:32:21,021 INFO [stdout] (default task-12) org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-STLDEEPX06-sial-com-49604-1486506685312-1-2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1779) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.util.ExchangeHelper.extractResultBody(ExchangeHelper.java:677) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:515) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:511) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.sendBodyAndHeaders(DefaultProducerTemplate.java:259) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at org.apache.camel.impl.DefaultProducerTemplate.sendBodyAndHeaders(DefaultProducerTemplate.java:253) ~[camel-core-2.18.2.jar:2.18.2] 16:32:21,021 INFO [stdout] (default task-12) at com.sial.notifications.messages.rest.NotificationMessageServiceImpl.submitMessageToRoute(NotificationMessageServiceImpl.java:66) ~[classes:?]
Upvotes: 1
Views: 3646
Reputation: 615
The solution was to add .convertBodyTo(String.class)
after marshaling the class to JSON. That shouldn't be necessary, but doing this made Kafka happy.
Upvotes: 1
Reputation: 16465
add serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer
to the producer config
Or
add .convertBodyTo(String.class)
after marshaling the object to JSON as you said
I would prefer the former one as it's one less operation.
Upvotes: 2