definepi314
definepi314

Reputation: 73

Send / Receive Java Objects through Kafka

I want to send Java Object through Kafka and receive them as Java Object as well. I thought the following configuration for ProducerFactory and ConsumerFactory will suffice, but I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord. What do I need to change in ConsumerFactory?

Consumer Configuration:

@Configuration
public class KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public ConsumerFactory<String, Object> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);

    deserializer.addTrustedPackages("*");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory("headers"));
  return factory;
}

}

Producer Configuration:

@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

}

Upvotes: 2

Views: 6035

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191681

I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord

Have you looked at the Javadoc for that class and noticed the .value() method?

FWIW, Avro or Protobuf serializers are more efficient at sending objects than JSON

Upvotes: 1

Chamina Sathsindu
Chamina Sathsindu

Reputation: 528

As you mentioned you can get the java object using the value of the consumer record.

 @component
 public class Listener {

  @kafkaListener
  void listen(ConsumerRecord<String, Car> record) {
  Car car = record.value();
  }
 }

by accessing the record.value(). You can get the relevant java object for that you have to have to deserializers in place.

Upvotes: 4

Related Questions