Reputation: 411
I have an example of Springboot project of Kafka producer sending a message to a Kafka consumer.
This is how my Kafka Producer config looks like:
@Configuration
public class KafkaProducerConfig {
@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;
@Bean
public ProducerFactory<String, Customer> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Customer> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Below is my service class for Kafka producer:
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;
@Value("${my.kafka.topic}")
String kafkaTopic = "my-test";
public void send(Customer customer) {
System.out.println("sending customer data=" + customer);
kafkaTemplate.send(kafkaTopic, customer);
}
}
My KafkaConsumer config class is as shown below:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${my.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(Customer.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Following is my Kafka Consumer class:
@Service
public class KafkaConsumer {
@KafkaListener(topics="${my.kafka.topic}")
public void processMessage(Customer customer) {
System.out.println("received customer details = " + customer);
}
}
I would like to send back an acknowledgement back to the producer possibly a message which includes the customer name. But I cannot figure out how can I achieve this. Is there a way which I can extract sender data at the consumer end so I can a produce a message back to the producer?
Upvotes: 2
Views: 6060
Reputation: 603
One way to achieve this is to publish an acknowledgement message on another topic to which a producer shall listen to. That message can contain the data you want along with co-relation data
Upvotes: 1