Reputation: 10422
i have made sender class using kafkatemplate bean to send payload to topic with some configuration in SenderConfiguration class .
Sender Class
@Component
public class Sender {
private static final Logger LOGGER = (Logger) LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, "1", payload);
}
}
, senderConfiguration class
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
the problem is in sending not in producing
here the application.yml file properties
kafka:
bootstrap-servers: localhost:9092
topic:
helloworld: helloworld.t
and simply controller containing
@RestController
public class Controller {
protected final static String HELLOWORLD_TOPIC = "helloworld.t";
@Autowired
private Sender sender;
@RequestMapping("/send")
public String SendMessage() {
sender.send(HELLOWORLD_TOPIC, "message");
return "success";
}
}
and the exception is
2017-12-20 09:58:04.645 INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1
2017-12-20 09:58:04.645 INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247
2017-12-20 09:59:04.654 ERROR 10816 --- [nio-7060-exec-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='1' and payload='message' to topic helloworld.t:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Upvotes: 3
Views: 10636
Reputation: 341
There are few possibility for this type of error.
telnet localhost 9092
if you are getting output that means kafka borker is runningCheck kafka-client version that spring-boot uses is the same as your kafka version. If the version mismatch, kafka may ne able to send data to topic
Sometimes it take time for the brokers to know about the newly created topic. So, the producers might fail with the error Failed to update metadata after 60000 ms.
To get around this create kafka manually using kafka command line options.
Listener configuration of server.properties not working.
You can try this as well
change the "bootstrap.servers" property or the --broker-list option to 0.0.0.0:9092
change the server.properties in 2 properties
Hope that helps!
Upvotes: 0
Reputation: 143
That means your brokers are not running. check server.log and restart broker if necessary
Upvotes: 0
Reputation: 191884
Use the method that includes a key
kafkaTemplate.send(topic, key, payload);
Its not clear what key value you want to use, but it should evenly distribute amongst the partition count of the topic. For example, a random number within the range of the partition count.
Upvotes: 0