Reputation: 195
Producing Messages using Kafka
import java.util.Date;
import java.util.Properties;
import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
private static Producer<String, String> producer;
public KafkaProducer()
{
Properties props = new Properties();
props.put("metadata.broker.list","localhost:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks","1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String,String>(config);
}
public static void main(String[] args)
{
if(args.length<2)
{
System.err.println("Usage: KafkaProducer TopicName MessageCount");
System.exit(0);
}
String topic = args[0];
int messageCount = Integer.parseInt(args[1]);
KafkaProducer kafka = new KafkaProducer();
kafka.publishMessage(topic,messageCount);
}
private void publishMessage(String topic, int messageCount)
{
for(int mcount=0;mcount<messageCount;mcount++)
{
String runtime = new Date().toString();
String msg = "Message Published Time -" + runtime;
System.out.println(msg);
KeyedMessage<String,String> data = new KeyedMessage<String,String>(topic,msg);
producer.send(data);
}
producer.close();
}
}
While running this program using eclipse, i am getting the following exception:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Message Published Time -Fri Jul 10 13:05:20 IST 2015
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at KafkaProducer.publishMessage(KafkaProducer.java:52)
at KafkaProducer.main(KafkaProducer.java:39)
The Zookeeper service started and Broker Started and Topic created. Consumer is also ready.
Can any one help me for this issue?
Upvotes: 1
Views: 2747
Reputation: 151
Can you please check that your Kafka server is running on the same port which is used by producer API ?
Generally Kafka clusters run on port 9092. If this is case with your setup, use the same port in producer configuration. Your producer is using port 2181. Probably that is a mistake.
Upvotes: 0
Reputation: 37785
You can try using the new KafkaProducer briefly described in the kafka docs.
Note importing org.apache.kafka.clients.producer.*
instead of kafka.javaapi.producer.Producer
something like:
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerTest {
public static void main(String args[]) throws InterruptedException, ExecutionException {
// set up Kafka producer
KafkaProducer<String,String> kafkaProducer;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// instantiate the producer
kafkaProducer = new KafkaProducer<String,String>(props);
// add data to kafka
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("topic", "test key", "test value");
kafkaProducer.send(producerRecord);
// close producer
kafkaProducer.close();
}
}
Upvotes: 1