Gherbi Hicham
Gherbi Hicham

Reputation: 2574

org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms

I'm getting the error:

 org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.

When trying to produce to the topic in my local kafka instance on windows using Java. Note that the topic testtopic2 exists and I'm able produce messages to it using the windows console producer just fine.

Below the code that I'm using:

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Kafka_Producer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        TestCallback callback = new TestCallback();
        for (long i = 0; i < 100 ; i++) {
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                    "testtopic2", "key-" + i, "message-"+i );
            producer.send(data, callback);
        }

        producer.close();
    }


    private static class TestCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println("Error while producing message to topic :" + recordMetadata);
                e.printStackTrace();
            } else {
                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                System.out.println(message);
            }
        }
    }

}

Pom dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

Output of list and describe: output of list topics

output of describe testtopic2

Upvotes: 43

Views: 175451

Answers (19)

Debmalya Jash
Debmalya Jash

Reputation: 311

After changing to spring boot version to 3.4.0 from 3.3.6

With following configuration @EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers:localhost:9092", topics = {example-topic}) got exception Topic example-topic not present in metadata after 60000 ms.

Used only @EmbeddedKafka it worked. Thank you.

Upvotes: 0

Ashish Lahoti
Ashish Lahoti

Reputation: 978

I faced the same issue and find out that the Kafka Cluster IPs were not whitelisted in the application deployed in Cloud. It worked after IP whitelisting.

Upvotes: 0

Bob Dobbs
Bob Dobbs

Reputation: 236

I was having this same problem today. I'm a newbie at Kafka and was simply trying to get a sample Java producer and consumer running. I was able to get the consumer working, but kept getting the same "topic not present in metadata" error as you with the producer.

Finally, out of desperation, I added some code to my producer to dump the topics. When I did this, I got runtime errors because of missing classes in packages jackson-databind and jackson-core. After adding them, I no longer got the "topic not present" error. I removed the topic-dumping code I temporarily added and it still worked.

Upvotes: 22

Hany Sakr
Hany Sakr

Reputation: 2899

I faced the same issue, it's simply the producer wasn't able to connect to the bootstrap server, and my problem was related to the JKS trust-store for the SSL configuration, once I configured it correctly, everything started to work as usual.

Upvotes: 0

Chris Lebnitz
Chris Lebnitz

Reputation: 11

In the case of reading stream and writing stream from a topic to another, a possible solution could be:

<dataset being read>.drop("partition")

Explanation: the row in the read dataframe comes with the source's partition column, if the source topic has more partitions than the destination topic has, then it's gonna try to write the row to the specified partition in the destination. If that partition doesn't exist on the destination topic then you will get the error.

I was also able to obtain a more comprehensive version of the error when deployed in cluster mode: Partition 22 of topic with partition count 3 is not present in metadata after 60000 ms.

The solution would be to either drop the partition column and let kafka choose the partition itself, or replace the original partition number with a desired one (using modulo #destination partitions).

Upvotes: 0

I was having the same problem, and it's because of wrong config. Here's my Producer configuration that worked. Change ${} properties with your config. Don't forget to set all properties:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ${servers});
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    props.put("enable.auto.commit", "false");
    props.put("auto.offset.reset", "earliest");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("basic.auth.user.info", "${user}:${pass}");
    props.put("sasl.kerberos.service.name", "kafka");
    props.put("auto.register.schemas", "false");
    props.put("schema.registry.url", "${https://your_url}");
    props.put("schema.registry.ssl.truststore.location", "client_truststore.jks");
    props.put("schema.registry.ssl.truststore.password", "${password}");

    KafkaProducer producer = new KafkaProducer(props);

    ClassEvent event = getEventObjectData();

    ProducerRecord<String, ClassEvent> record = new ProducerRecord<String, ClassEvent>(args[0], event);

Execution from cluster:

java -Djava.security.auth.login.config=${jaas.conf} -cp ${your-producer-example.jar} ${your.package.class.ClassName} ${topic}

Hope it helps

Upvotes: 0

  1. The two dependencies in pom : kafka-streams and spring-kafka
  2. in application.yml (or properties) :
    spring:
      kafka:
        bootstrap-servers: <service_url/bootstrap_server_ur>
        producer:
          bootstrap-servers: <service_url/bootstrap_server_url>
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          group-id: <your_consumer_id>
  1. @SpringBootApplication class another annotation : @EnableKafka

This will make it work without any errors.

Upvotes: 0

Rail Yulgutlin
Rail Yulgutlin

Reputation: 446

in case if you came here with same error while setting up your integration tests using testcontainers, this could happen because of used port by kafka inside container and exposed port outside. So, make sure that started bootstrap server port is correctly mapped to exposed port that you are using in your tests.

In my case i just replaced properties file entries after kafka container started:

KafkaContainer kafka = new KafkaContainer(...);
kafka.start();

String brokers = kafka.getBootstrapServers()
TestPropertySourceUtils.addInlinedPropertiesToEnvironment(context,
    "spring.kafka.bootstrap-servers=" + brokers,
    "spring.kafka.producer.bootstrap-servers=" + brokers,
    "spring.kafka.consumer.bootstrap-servers=" + brokers
);

Spent quite some time before I figured it out, hope this helps someone.

Upvotes: 0

lmo
lmo

Reputation: 547

I was facing same issue. It could happen when your bootstrap or registry URL are wrong or unreachable

Upvotes: 0

Federico Nafria
Federico Nafria

Reputation: 1600

It might also be caused by an nonexistent partition.

e.g. If you have a single partition [0] and your producer tries to send to partition [1] you'll get the same error. The topic in this case exists, but not the partition.

Upvotes: 25

kazzaki
kazzaki

Reputation: 72

You may want to check your producer properties for metadata.max.idle.ms

The metadata a producer caches for as long as above configured value. Any changes to the meta on the broker end will not be available on the client (producer) immediately. Restarting a producer should however, read the metadata at startup.

Update: check default values here.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms

Upvotes: 0

soccerlover
soccerlover

Reputation: 41

This error is an apparent error, and it may be triggered by the following deep conditions.

  1. First and the most situation is your kafka producer config is wrong, check your kafka properties BOOTSTRAP_SERVERS_CONFIG weather is correct server address.
  2. In docker environment, you might check your port mapping.
  3. Check whether the firewall has opened port 9092 of the server where the broker is located.
  4. If your broker run in ssl, check your producer config about SSL_TRUSTSTROE_LOCATION_CONFIG, SECURITY_PROTOCOL_CONFIG, SSL_TRUSTSTORE_TYPE_CONFIG. And, some broker config both run in ssl and PLAINTEXT, make sure which port is your need.

Upvotes: 4

kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3

First try to insert the topic with in the Kafka stream using the above command

here my_first is the topic name.

Upvotes: 0

phlogiston
phlogiston

Reputation: 371

I saw this issue when someone on my team had changed the value for the spring.kafka.security.protocol config (we are using Spring on my project). Previously it had been "SSL" in our config, but it was updated to be PLAINTEXT. In higher environments where we connect to a cluster that uses SSL, we saw the error OP ran into.

Why we saw this error as opposed to an SSL error or authentication error is beyond me, but if you run into this error it may be worth double checking your client authentication configs to your Kafka cluster.

Upvotes: 8

void
void

Reputation: 1514

Note that this could happen as well because the versions of kafka-client and Spring are not compatible

More info in https://spring.io/projects/spring-kafka "Kafka Client Compatibility" matrix

Upvotes: 0

Dmitriy Fialkovskiy
Dmitriy Fialkovskiy

Reputation: 3225

This error also can appear because of destination Kafka instance "died" or URL to it is wrong.

In such a case a thread that sends message to Kafka will be blocked on max.block.ms time which defaults exactly to 60000 ms.

You can check whether it is because of above property by passing changed value:

Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice 

If TimeoutException is thrown after your specified time, then you should check whether your URL to Kafka is correct or Kafka instance is alive.

Upvotes: 16

java_enthu
java_enthu

Reputation: 2327

I also had similar issue, where I was trying this on my local environment on my macbook. It was quite frustrating and I tried a few approaches

  1. Stopped Zookeeper, Stopped Kafka, restarted ZK and Kafka. (Didn't help)
  2. Stopped ZK. Deleted ZK data directory. Deleted Kafka logs.dirs and restarted Kafka (Didn't help)
  3. Restarted my macbook - This did the trick.

I have used Kafka in production for more than 3 years, but didn't face this problem on the cluster, happened only on my local environment. However, restarting fixes it for me.

Upvotes: 5

Steephen
Steephen

Reputation: 15814

  1. I created a topic with single partition and tried to populate the topic into 10 partitions. And I got this issue.

  2. I deleted the topic using kafka-topics.sh script, but didn't wait long to finish the clean up. I started populating the topic. When I was looking at topic metadata, it has one partition and I am getting exactly same issue as mentioned in first part of this answer.

Upvotes: 1

Brandon
Brandon

Reputation: 111

First off I want to say thanks to Bobb Dobbs for his answer, I was also struggling with this for a while today. I just want to add that the only dependency I had to add is jackson-databind. This is the only dependency I have in my project, besides kafka-clients.

Update: I've learned a bit more about what's going on. kafka-clients sets the scope of its jackson-databind dependency as "provided," which means it expects it to be provided at runtime by the JDK or a container. See this article for more details on the provided maven scope.

This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name. A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.

I'm not sure the exact reasoning on setting its scope to provided, except that maybe this library is something people normally would want to provide themselves to keep it up to the latest version for security fixes, etc.

Upvotes: 7

Related Questions