Reputation: 4066
I am using kafka java client 0.11.0
and kafka server 2.11-0.10.2.0
.
My code :
KafkaManager
public class KafkaManager {
// Single instance for producer per topic
private static Producer<String, String> karmaProducer = null;
/**
* Initialize Producer
*
* @throws Exception
*/
private static void initProducer() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.kafkaUrl);
props.put(ProducerConfig.RETRIES_CONFIG, Constants.retries);
//props.put(ProducerConfig.BATCH_SIZE_CONFIG, Constants.batchSize);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Constants.requestTimeout);
//props.put(ProducerConfig.LINGER_MS_CONFIG, Constants.linger);
//props.put(ProducerConfig.ACKS_CONFIG, Constants.acks);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Constants.bufferMemory);
//props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Constants.maxBlock);
props.put(ProducerConfig.CLIENT_ID_CONFIG, Constants.kafkaProducer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try {
karmaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
}
catch (Exception e) {
throw e;
}
}
/**
* get Producer based on topic
*
* @return
* @throws Exception
*/
public static Producer<String, String> getKarmaProducer(String topic) throws Exception {
switch (topic) {
case Constants.topicKarma :
if (karmaProducer == null) {
synchronized (KafkaProducer.class) {
if (karmaProducer == null) {
initProducer();
}
}
}
return karmaProducer;
default:
return null;
}
}
/**
* Flush and close kafka producer
*
* @throws Exception
*/
public static void closeKafkaInstance() throws Exception {
try {
karmaProducer.flush();
karmaProducer.close();
} catch (Exception e) {
throw e;
}
}
}
Kafka Producer
public class KafkaProducer {
public void sentToKafka(String topic, String data) {
Producer<String, String> producer = null;
try {
producer = KafkaManager.getKarmaProducer(topic);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, data);
producer.send(producerRecord);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Main Class
public class App {
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello World! I am producing to stream " + Constants.topicKarma);
String value = "google";
KafkaProducer kafkaProducer = new KafkaProducer();
for (int i = 1; i <= 1; i++) {
kafkaProducer.sentToKafka(Constants.topicKarma, value + i);
//Thread.sleep(100);
System.out.println("Send data to producer=" + value);
System.out.println("Send data to producer=" + value + i + " to tpoic=" + Constants.topicKarma);
}
}
}
What is my problem:
When my loop length if around 1000 (in class App
), I am successfully able to send data to Kafka topic.
But when My loop length is 1 or less than 10, I am not able to send data to Kafka topic. Note I am not getting any error.
According to my finding, If I want to send a single message to Kafka topic, According to this program I get the successful message but never get a message on my topic.
But If I use Thread.sleep(10) (as you can see in my App class I have commented it), then I successfully send data on my topic.
Can you please explain why kafka showing this ambigous behabiour.
Upvotes: 0
Views: 5390
Reputation: 275
Can you add Thread.sleep(100);
just before exiting main?
If I understand correctly then everything works well if you sleep for a small amount of time. If that's the case, then it implies that your application is getting killed before the message is sent asynchronously.
Upvotes: 1
Reputation: 4334
Each call to KafkaProducer.send() is returning a Future. You can use the last of those Futures to block the main Thread before exiting. Even easier, you can just call KafkaProducer.flush() after sending all your messages: http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.
Upvotes: 3
Reputation: 10075
You are facing the problem because the producer executes sending in async way. When you send, the message is put inside an internal buffer in order to get a bigger batch and then send more messages with one shot. This batching features is configured with batch.size and linger.ms, it means that messages are sent when the batch size reached that value or a linger time elapsed.
I have replied on something similar here : Cannot produce Message when Main Thread sleep less than 1000
Even you are saying "When my loop length if around 1000 (in class App), I am successfully able to send data to Kafka topic." ... but maybe you don't see all the sent messages because the latest batch isn't sent. With shorter loop the above conditions aren't reached in time so you shutdown the application before producer has enough time/batch size for sending.
Upvotes: 1