Reputation: 2699
We are considering to use Kafka in our for messaging and our applications are developed using Spring. So, we have planned to use spring-kafka.
The producer puts the message as HashMap object into the queue. We have JSON serializer and we assumed that the map will be serialized and put into the queue. And here is the producer config.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
On the other hand, we have a listener which listens to the same topic where the producer has published the message. Here is the consumer config:
spring:
kafka:
consumer:
group-id: xyz
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Our listener method:
public void listener(SomeClass abx)
We were expecting the json will be de-serialized and an object of type "SomeClass" will be generated. But apparently, it throws de-serialization exception.
We saw few articles and the suggestion was to do something like:
@Bean
public ConsumerFactory<String, Car> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Car.class));
}
We don't want to write some code for creating the Deserializer. Is there any boilerplate thing which we are missing? Any help will be appreciated!!
Upvotes: 12
Views: 46409
Reputation: 497
I was consuming remote Kafka producer event and facing Class not found exception.
so finally I removed configuration form .properties file added below config class in consumer.
Here is my application.properties.
spring.application.name=payment-service
server.port=8082
spring.kafka.payment.bootstrap-servers= localhost:9092
spring.kafka.order.consumer.group-id.notification= group-id
spring.kafka.consumer.auto-offset-reset= latest
spring.kafka.order.topic.create-order=new_order1
Code:
@EnableKafka
@Configuration("NotificationConfiguration")
public class CreateOrderConsumerConfig {
@Value("${spring.kafka.payment.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.order.consumer.group-id.notification}")
private String groupId;
@Bean("NotificationConsumerFactory")
public ConsumerFactory<String, OrderEvent> createOrderConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.swiggy.payment.event.OrderEvent");// this my consumer event class
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(OrderEvent.class));
}
@Bean("NotificationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> createOrderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createOrderConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
Upvotes: 0
Reputation: 174554
See the boot documentation. In particular:
You can also configure the Spring Kafka JsonDeserializer as follows:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
Upvotes: 23