Reputation: 2467
I am new to Kafka. I am trying to send a message to Kafka topic which contains header and payload.
Below is the error:
"org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.cabservice.request.CabLocationPayload to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer\nCaused by: java.lang.ClassCastException: class com.cabservice.request.CabLocationPayload cannot be cast to class java.lang.String
Payload: { "header":{ "eventName":"CAB-LOCATION", "eventId":"3b1i333kiwoskl", "timestamp":1615205167470 }, "payload":{ "cabId":"cc8", "driverId":"[email protected]", "geoLocation":{ "id":"1234", "latitude":78.12, "longitude":45.23 } } }
I have CabLocationPayload which has fields Header and Payload.
public class CabLocationPayload {
private Header header;
private Payload payload;
// getter and setters }
In Controller,
@PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestBody CabLocationPayload cabLocationPayload) {
Header and Payload has mapping fields for the Json.
After changing VALUE_SERIALIZER_CLASS_CONFIG in Producer, I am able to see the data. But still failing with ClassCastException.
{public class KafkaConfiguration { @Bean public ProducerFactory<String, String> producerFactoryString() { Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}}
Current error is {2 09:41:20.108 INFO 22561 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: lWghv-b_RG-_hO-qOp_cjA 2021-04-22 09:41:20.123 ERROR 22561 --- [nio-9080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.cabservice.request.CabLocationPayload to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer] with root cause
java.lang.ClassCastException: class com.cabservice.request.CabLocationPayload cannot be cast to class java.lang.String (com.cabservice.request.CabLocationPayload is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @1144043d; java.lang.String is in module java.base of loader 'bootstrap') at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.6.0.jar:na]}
Any help is really appreciated.
Upvotes: 0
Views: 2369
Reputation: 363
Kafka Configuration 'value.serializer' Config should a Serializer subclass, not your object type
For example
key: VALUE_SERIALIZER_CLASS_CONFIG, value: JsonSerializer.class (source: org.springframework.kafka.support.serializer)
Example producer config:
@EnableKafka
@Configuration
public class KafkaProducerConfiguration {
@Bean
KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(getConfig());
}
private Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
}
Example consumer config:
You have to replace Yourclass with class name you want to be consume. (for this example: CabLocationPayload)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfiguration {
private Map<String, Object> consumerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id");
return props;
}
@Bean
public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() {
final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false));
return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.kafkaListenerConsumerFactory());
return factory;
}
}
Upvotes: 1
Reputation: 2467
Moved Kafka configs to application.properties
spring.kafka.consumer.bootstrap-servers: localhost:9092 spring.kafka.consumer.group-id: group-id spring.kafka.consumer.auto-offset-reset: earliest spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers: localhost:9092 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Not sure if spring.kafka.consumer.properties.spring.json.trusted.packages=*
is making the difference.
Upvotes: 0