whomer
whomer

Reputation: 615

How do you programmatically create a topic with Kafka 1.1.0

I recently upgraded to Kafka 1.1.0. I am trying to create unit tests for the kafka consumer. For this purpose it would be ideal if the unit test can create the topic it uses for the test. I found some code that looks like it should do what I want. However, when I run it it throws an exception: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V

Here is the code to create a topic which I found on line:

@BeforeClass
public static void createTopic() {
   try (final AdminClient adminClient = AdminClient.create(configure())) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }
}

However it throws an exception when I run it.

java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:334)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at com.sial.notifications.topics.OrdersTopicsTests.createTopic(OrdersTopicsTests.java:162)

The only configuration parameters I pass to it is the bootstrap servers and a client.id. What am I doing wrong? it seems simple enough

Upvotes: 3

Views: 4879

Answers (2)

Marius Waldal
Marius Waldal

Reputation: 9952

A much easier way is to just configure Kafka to automatically create any topic you use that does not exist already with the

auto.create.topics.enable

setting for Kafka. Doing this, there is no extra code needed for creating topics. You just use whatever topic name you want, and Kafka will create it for you when you use it.

Upvotes: 0

vahid
vahid

Reputation: 1218

This slightly modified code worked for me when I ran it stand-alone against a 1.1.0 broker:

public static void main(String[] args) {
    final String ordersTopic = "test-orders";
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    try (final AdminClient adminClient = AdminClient.create(props)) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException))
                throw new RuntimeException(e.getMessage(), e);
        }
    }
}

Since this is pretty similar to your code, and based on the error you're seeing, perhaps you haven't completely sorted out the dependencies to Kafka libraries? I used the Maven artifact org.apache.kafka:kafka_2.12:1.1.0.

Upvotes: 6

Related Questions