Shaaban Ebrahim
Shaaban Ebrahim

Reputation: 10422

TimeoutException thrown when sending to topic

i have made sender class using kafkatemplate bean to send payload to topic with some configuration in SenderConfiguration class .

Sender Class

@Component
public class Sender {
    private static final Logger LOGGER = (Logger)  LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String>   kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);

        kafkaTemplate.send(topic, "1", payload);
    }
}

, senderConfiguration class

@Configuration
public class SenderConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Sender sender() {
        return new Sender();
    }
}

the problem is in sending not in producing

here the application.yml file properties

kafka:
   bootstrap-servers: localhost:9092
topic:
   helloworld: helloworld.t

and simply controller containing

@RestController
public class Controller {
    protected final static String HELLOWORLD_TOPIC = "helloworld.t";

    @Autowired
    private Sender sender;

    @RequestMapping("/send")
    public String SendMessage() {

        sender.send(HELLOWORLD_TOPIC, "message");

        return "success";
    }
}

and the exception is

2017-12-20 09:58:04.645  INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2017-12-20 09:58:04.645  INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2017-12-20 09:59:04.654 ERROR 10816 --- [nio-7060-exec-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a      message with key='1' and payload='message' to topic helloworld.t:

org.apache.kafka.common.errors.TimeoutException: Failed to update   metadata after 60000 ms.

Upvotes: 3

Views: 10636

Answers (3)

Niket Shah
Niket Shah

Reputation: 341

There are few possibility for this type of error.

  1. Kafka broker is not reachable using configured port.
    • For this try to telnet using telnet localhost 9092 if you are getting output that means kafka borker is running
  2. Check kafka-client version that spring-boot uses is the same as your kafka version. If the version mismatch, kafka may ne able to send data to topic

  3. Sometimes it take time for the brokers to know about the newly created topic. So, the producers might fail with the error Failed to update metadata after 60000 ms. To get around this create kafka manually using kafka command line options.

  4. Listener configuration of server.properties not working.

You can try this as well

  • change the "bootstrap.servers" property or the --broker-list option to 0.0.0.0:9092

  • change the server.properties in 2 properties

    • listeners = PLAINTEXT://your.host.name:9092 to listeners=PLAINTEXT://:9092
    • advertised.listeners=PLAINTEXT://your.host.name:9092 to advertised.listeners=PLAINTEXT://localhost:9092

Hope that helps!

Upvotes: 0

Santosh Rachakonda
Santosh Rachakonda

Reputation: 143

That means your brokers are not running. check server.log and restart broker if necessary

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191884

Use the method that includes a key

kafkaTemplate.send(topic, key, payload);

Its not clear what key value you want to use, but it should evenly distribute amongst the partition count of the topic. For example, a random number within the range of the partition count.

Upvotes: 0

Related Questions