Reputation: 73
I have scenario where kafka is getting down randomly. When i send message to kafka and it is down i want to handle it separately. But When i send message to kafka while it is down, no exception is coming and so i am not able to identify whether kafka is down or not. Is there any way i can catch exception and the lost message.
I have a very basic code
public static void main(String[] args) {
String topicName = "test-1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)));
}
System.out.println("Message sent successfully");
producer.close();
}
Upvotes: 6
Views: 2542
Reputation: 39810
You need to send a synchronous call in order to get a response back from Kafka.
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i));
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
// Handle message that has failed
}
}
Method .get()
will return the response from Kafka. It will throw an exception when the record has failed to be pushed to Kafka. When the record has been successfully sent, then a RecordMetadata
will be returned.
Upvotes: 6