Reputation: 73
I want to send Java Object through Kafka and receive them as Java Object as well. I thought the following configuration for ProducerFactory and ConsumerFactory will suffice, but I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord. What do I need to change in ConsumerFactory?
Consumer Configuration:
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, Object> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);
deserializer.addTrustedPackages("*");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
}
}
Producer Configuration:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Upvotes: 2
Views: 6035
Reputation: 191681
I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord
Have you looked at the Javadoc for that class and noticed the .value()
method?
FWIW, Avro or Protobuf serializers are more efficient at sending objects than JSON
Upvotes: 1
Reputation: 528
As you mentioned you can get the java object using the value of the consumer record.
@component
public class Listener {
@kafkaListener
void listen(ConsumerRecord<String, Car> record) {
Car car = record.value();
}
}
by accessing the record.value(). You can get the relevant java object for that you have to have to deserializers in place.
Upvotes: 4