Reputation: 10139
I would like to set request time-out so added request.timeout.ms parameter. Bu when I have broken instinctively the broker connection there is not any timeout error occur?
What I am missing in this configuration? Do I need to modify server setting as well?
public void init() {
LOGGER.info("initializing KafkaProducer: Topic Name: {}", topic);
System.out.println("initializing KafkaProducer: Topic Name: {}");
Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("linger.ms", 5);
properties.put("block.on.buffer.full", false);
properties.put("request.timeout.ms", "1000");
//properties.put("metadata.fetch.timeout.ms", 1000);
producer = new KafkaProducer<>(properties);
}
public void produce(String txnLogStr) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, txnLogStr);
producer.send(record, new ProducerCallback());
}
private class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("onCompletion recordMetadata:"+recordMetadata.offset()+", e:"+e);
if (e != null && recordMetadata != null) {
LOGGER.error("Kafka Queue problem. Topic: {}", topic, e);
e.printStackTrace();
}else {
System.out.println("No error");
}
}
}
Kafka version : kafka_2.11-0.10.2.0
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
</dependencies>
Upvotes: 3
Views: 6658
Reputation: 10139
worked with following property setting
properties.put("metadata.fetch.timeout.ms", "1000");
Upvotes: 2