Reputation: 131
I have written a program which creates Kafka topics upon startup with AdminClient. If a topic exists, a TopicExistsException is thrown. However I am not able to catch it. Related code:
try {
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(new NewTopic(topic, 2, (short) 1)));
result.all().get(5, TimeUnit.SECONDS);
} catch (TopicExistsException e){
log.info("topic exists: {}", topic);
} catch ( InterruptedException | TimeoutException | ExecutionException e){
log.error("Topic create failed with: {}", e.getMessage());
}
The expected behavior would be that the first catch is executed, but based on the logs this is not what happens:
2020-12-01 13:18:56,462 ERROR [restartedMain] com.mypackage.KafkaService@lambda$configureTopics$0:75 - Topic create failed with: org.apache.kafka.common.errors.TopicExistsException: Topic 'MEASUREMENT_TEST' already exists.
My imports are for the exceptions:
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.lang.InterruptedException;
Any suggestion? Is there anything I am missing?
Upvotes: 2
Views: 2498
Reputation: 1879
This is because the topic exists exception is getting wrapped in the ExecutionException
that gets thrown when the KafkaFuture
completes.
You can see this by logging the full stack trace, not just the getMessage()
part:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'hello' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) ~[kafka-clients-2.0.0.jar:?]
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'hello' already exists.
So if you want to catch the TopicExistsException
, you've got to do something like:
try {
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(new NewTopic(topic, 2, (short) 1)));
result.all().get(5, TimeUnit.SECONDS);
} catch (ExecutionException e){
if(e.getCause() != null && e.getCause() instanceof TopicExistsException) {
log.info("topic exists: {}", topic);
} else {
log.error("Topic create failed with: {}", e);
}
} catch ( InterruptedException | TimeoutException){
log.error("Topic create failed with: {}", e.getMessage());
}
Upvotes: 4