Deepank Porwal
Deepank Porwal

Reputation: 73

No exception is coming while sending message when kafka is down

I have scenario where kafka is getting down randomly. When i send message to kafka and it is down i want to handle it separately. But When i send message to kafka while it is down, no exception is coming and so i am not able to identify whether kafka is down or not. Is there any way i can catch exception and the lost message.

I have a very basic code

public static void main(String[] args) {
    String topicName = "test-1";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();
}

Upvotes: 6

Views: 2542

Answers (1)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39810

You need to send a synchronous call in order to get a response back from Kafka.

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = 
              new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i));
    try {
           producer.send(record).get();
    } catch (Exception e) {
           e.printStackTrace();
           // Handle message that has failed
    }
}

Method .get() will return the response from Kafka. It will throw an exception when the record has failed to be pushed to Kafka. When the record has been successfully sent, then a RecordMetadata will be returned.

Upvotes: 6

Related Questions