Cannot catch Kafka TopicExistsException

I have written a program which creates Kafka topics upon startup with AdminClient. If a topic exists, a TopicExistsException is thrown. However I am not able to catch it. Related code:

try {
    CreateTopicsResult result = adminClient.createTopics(Collections.singleton(new NewTopic(topic, 2, (short) 1)));
    result.all().get(5, TimeUnit.SECONDS);
} catch (TopicExistsException e){
    log.info("topic exists: {}", topic);
} catch ( InterruptedException | TimeoutException | ExecutionException e){
    log.error("Topic create failed with: {}", e.getMessage());
}

The expected behavior would be that the first catch is executed, but based on the logs this is not what happens:

2020-12-01 13:18:56,462 ERROR [restartedMain] com.mypackage.KafkaService@lambda$configureTopics$0:75 - Topic create failed with: org.apache.kafka.common.errors.TopicExistsException: Topic 'MEASUREMENT_TEST' already exists.

My imports are for the exceptions:

import org.apache.kafka.common.errors.TopicExistsException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.lang.InterruptedException;

Any suggestion? Is there anything I am missing?

Upvotes: 2

Views: 2498

Answers (1)

Kyle Fransham
Kyle Fransham

Reputation: 1879

This is because the topic exists exception is getting wrapped in the ExecutionException that gets thrown when the KafkaFuture completes.

You can see this by logging the full stack trace, not just the getMessage() part:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'hello' already exists.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) ~[kafka-clients-2.0.0.jar:?]
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) ~[kafka-clients-2.0.0.jar:?]
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) ~[kafka-clients-2.0.0.jar:?]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) ~[kafka-clients-2.0.0.jar:?]
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'hello' already exists.

So if you want to catch the TopicExistsException, you've got to do something like:

try {
    CreateTopicsResult result = adminClient.createTopics(Collections.singleton(new NewTopic(topic, 2, (short) 1)));
    result.all().get(5, TimeUnit.SECONDS);
} catch (ExecutionException e){
    if(e.getCause() != null && e.getCause() instanceof TopicExistsException) {
        log.info("topic exists: {}", topic);
    } else {
        log.error("Topic create failed with: {}", e);
    }
} catch ( InterruptedException | TimeoutException){
    log.error("Topic create failed with: {}", e.getMessage());
} 

Upvotes: 4

Related Questions