Reputation: 4090
I'm trying to handle failures in case the producer fails to send message to Kafka :
try {
Future<RecordMetadata> res = producer.send(new ProducerRecord<>(topic, msg.key(), msg));
log.info("Waiting for confirmation from kafka for message : \n {}",msg.toString());
record = res.get(30,TimeUnit.SECONDS);
log.info("Successfully produced message : msg")
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("The following message wasnt sent to kafka because of an error : {}", msg.toString(), e);
}
When I try to produce a message to a topic that doesn't exist I'm seeing the following error :
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic just_fake_topic not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1307) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:962) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:750) ~[kafka-clients-2.5.0.jar!/:na]
at com.xx.xx.produceMessage.handleMessage(produceMessage.java:27) ~[classes!/:0.0.1-SNAPSHOT]
at
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic just_fake_topic not present in metadata after 60000 ms.
I'm getting the timeout after 60 seconds not 30 seconds as I configured in the get method of the future object.
I tried also to configure metadata.max.idle.ms=30000
and max.block.ms=30000
in my kafka.properties because I was also getting the following warning for about 5 minutes but it didn't help:
2020-06-28 14:31:26.009 WARN 10291 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 695 : {just_fake_topic =UNKNOWN_TOPIC_OR_PARTITION}
Any idea why ?
Upvotes: 0
Views: 3389
Reputation: 41
The exception might be getting thrown from the Kafka network thread.
https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-producer-internals-Sender.html
Maybe provide a callback as a second parameter for send. Note: callback will run on the kafka network thread.
Kafka Producer : Handle Exception in Async Send with Callback
Upvotes: 1