Reputation: 113
I am trying to compile and run a simple kafka
code that is a sample from Aapche
.When compiling I am getting the following exception, even after adding all the lib
files for scala
(i guess).
Exception in thread "main" java.lang.NullPointerException
at scala.Predef$.Integer2int(Predef.scala:303)
at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:103)
at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:44)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:44)
at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:102)
at kafka.producer.BrokerPartitionInfo.<init>(BrokerPartitionInfo.scala:32)
at kafka.producer.async.DefaultEventHandler.<init>(DefaultEventHandler.scala:41)
at kafka.producer.Producer.<init>(Producer.scala:60)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at kafkaTest.TestProducer.main(TestProducer.java:23)
This is my program
:
package kafkaTest;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
// long events = Long.parseLong(args[0]);
long events = 10l;
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
***![props.put("partitioner.class", "kafkaTest.SimplePartitioner");][1]***//this is line no 23
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) { long runtime =
new Date().getTime(); String ip = "192.168.2.1" + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip; KeyedMessage<String,
String> data = new KeyedMessage<String, String>("page_visits", ip,
msg); producer.send(data); }
producer.close();
}
}
The attached is the screen shot of library files.
Please let me know the cause of error/exception
.
Edit: this is SimplePartitioner.java
package kafkaTest;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner(VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1))
% a_numPartitions;
}
return partition;
}
}
Upvotes: 1
Views: 1048
Reputation: 599
I also got this error when metadata.broker.list
has a broker with no port number.
Upvotes: 0
Reputation: 110
There's a space at the end of your broker list :
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
Remove it and it should work fine then :
props.put("metadata.broker.list", "broker1:9092,broker2:9092");
Upvotes: 1