BlueStar
BlueStar

Reputation: 411

How to get an acknowledgement or a call back from Kafka consumer to the producer

I have an example of Springboot project of Kafka producer sending a message to a Kafka consumer.

This is how my Kafka Producer config looks like:

@Configuration
public class KafkaProducerConfig {

@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;

@Bean
public ProducerFactory<String, Customer> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(ProducerConfig.ACKS_CONFIG, "all");
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Customer> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
}

Below is my service class for Kafka producer:

@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;

@Value("${my.kafka.topic}")
String kafkaTopic = "my-test";

public void send(Customer customer) {
    System.out.println("sending customer data=" + customer);
    kafkaTemplate.send(kafkaTopic, customer);
}
}

My KafkaConsumer config class is as shown below:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;

@Value("${my.kafka.consumer.group-id}")
private String groupId;

@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props,
                                      new StringDeserializer(), 
                                      new JsonDeserializer<>(Customer.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}

Following is my Kafka Consumer class:

@Service
public class KafkaConsumer {

@KafkaListener(topics="${my.kafka.topic}")
public void processMessage(Customer customer) {
    System.out.println("received customer details = " + customer);
}
}

I would like to send back an acknowledgement back to the producer possibly a message which includes the customer name. But I cannot figure out how can I achieve this. Is there a way which I can extract sender data at the consumer end so I can a produce a message back to the producer?

Upvotes: 2

Views: 6060

Answers (1)

Sunil Singhal
Sunil Singhal

Reputation: 603

One way to achieve this is to publish an acknowledgement message on another topic to which a producer shall listen to. That message can contain the data you want along with co-relation data

Upvotes: 1

Related Questions