Harshit Sharma
Harshit Sharma

Reputation: 313

Naive Kafka Producer not working when I try to produce messages that are read from a file

I am trying to write a naive Kafka Producer using Java. The application accepts two inputs:

  1. Kafka topic name to which messages are to be produced
  2. Path of file containing messages to be produced to Kafka

I wrote the following code. When I run it, I see that the System.out.println statements print the expected values but messages are not produced to Kafka for some reason. What should I change to make it work?

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws IOException {
        try {
            if(args.length != 0) {
                topic = args[0];
                File file = new File(args[1]);
                br = new BufferedReader((Reader) new FileReader(file));
            }
        } catch (Exception e) {
            System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
            e.printStackTrace();
        }

        String msg;
        while ((msg = br.readLine()) != null) {
            System.out.println("Message to publish : " + msg);
            System.out.println("Topic : " + topic);
            producer.send(new ProducerRecord<String, String>(topic, "", msg));
        }
        return;
    }
}

Surprisingly the following code works (in which I have hard-coded everything):

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws IOException {
            try {
                String[] msgs = new String[2];
                msgs[0] = "message 1";
                msgs[1] = "message 2";
                topic = "mytopic"
                for(String msg:msgs){
                    producer.send(new ProducerRecord<String, String>(topic, "", msg));
                }
                producer.close();
            } catch (Exception e) {
                System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
                e.printStackTrace();
            }
    }
}

Upvotes: 0

Views: 68

Answers (1)

Natalia
Natalia

Reputation: 4532

there is a critial method invoked in the second snippet and missing in the first

producer.close();

from documenttion for that method:

Close this producer. This method blocks until all previously sent requests complete.

When you invoke method produce, it doesn't mean in reality that the message was produced. Method returns you future. You can wait for each message to be produced by invoking get() on each result of produce method.

Upvotes: 1

Related Questions