Deepak Garg
Deepak Garg

Reputation: 159

Apache Kafka Java Producer Scala Consumer missing streams

I am using Apache Kafka. I have created a war file in which producer is coded in Java and Consumer is coded in Scala. Producer is getting data from HTML page. I can see that most of the data published by producer is found on consumer , However some data is missing.

Here is my code for producer

File 1

package com.cts.rest;

import java.util.Properties;

import kafka.producer.ProducerConfig;

public class Configuration {

static ProducerConfig setKafkaProducerParameter() {
    Properties properties = new Properties();
    properties.put("zk.connect", "localhost:2181");
    properties.put("metadata.broker.list", "localhost:9092");
    properties.put("serializer.class", "kafka.serializer.StringEncoder");
    properties.put("acks", 0);  
    ProducerConfig producerConfig = new ProducerConfig(properties);
    return producerConfig;
    }

}

File 2

package com.cts.rest;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class RTTSKProducer {

static void sendDataToProducer(String line){

    ProducerConfig producerConfig = configuration.setKafkaProducerParameter();
    Producer<String, String> producer = new Producer<String, String>(producerConfig);       

    String topic = "jsondata";      
    KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, line);
    System.out.print(msg);
    producer.send(msg);
    producer.close();
            }
    }

Now i am checking messages on consumer using following command.

bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic jsondata --from-beginning

Is am missing any producer configuration?

Upvotes: 1

Views: 270

Answers (1)

amethystic
amethystic

Reputation: 7089

You could try to increase 'acks' config to make sure more durability. Most importantly, you should invoke 'send' method with a callback function to handle those messages that are not published to Kafka successfully, as below:

producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null)
                       e.printStackTrace();
                   System.out.println("The offset of the record we just sent is: " + metadata.offset());
               }
           });

Upvotes: 1

Related Questions