Reputation: 580
I am working on a POC which will read messages from Kafka and process it through Storm in real time. I have started a local Zookeeper and Kafka. I created a topic (named test
), producer and consumer and they are working fine from the command prompt. Now I wanted to read the messages from the topic using Storm. When I try to run the below code the Storm spout is not getting connected to the Kafka/Zookeeper. This is obvious from the log since there is no mention of localhost or 2181 anywhere. And the process fails with the exception
6939 [Thread-15-eventsEmitter-executor[2 2]] INFO o.a.s.k.PartitionManager - Read partition information from: /test/storm/partition_0 --> null
public class TestTopology {
public static void main(String[] args) {
BrokerHosts zkHosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "test", "/test", "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("eventsEmitter", kafkaSpout, 1);
builder.setBolt("eventsProcessor", new WordCountBolt(), 1).shuffleGrouping("eventsEmitter");
Config config = new Config();
config.setMaxTaskParallelism(5);
/*
* config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
*
* config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
* config.put(Config.STORM_ZOOKEEPER_SERVERS,
* Arrays.asList("localhost"));
*/
try {
ILocalCluster cls = new LocalCluster();
cls.submitTopology("my-topology", config, builder.createTopology());
} catch (Exception e) {
throw new IllegalStateException("Couldn't initialize the topology",
e);
}
}
}
It's connecting the local ZooKeeper which its creating and not to the one which is running the Kafka
4632 [Thread-11] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4632 [Thread-11] INFO o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@acd1da
4633 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4634 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2000, initiating session
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:62287
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:62287
4635 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154d458c4130011 with negotiated timeout 20000 for client /127.0.0.1:62287
4635 [Thread-11-SendThread(127.0.0.1:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2000, sessionid = 0x154d458c4130011, negotiated timeout = 20000
4635 [Thread-11-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
Kindly let me know if you need more info.
Upvotes: 0
Views: 591
Reputation: 580
After a nerve wracking night I hit upon the solution for this. Actually the problem was not with the code but with the Jars. I had attached log4j jars from all the 3 packages namely zookeeper,kafka and storm.But the code was expecting only one. This was showing as red warning in my eclipse which I had ignored earlier. When I removed the unnecessary log4js the kafka spout started reading from the Kafka topic which I had created. Thank you all for taking the time to look into this issue. @Matthias I suppose since I had linked it to the Zookeeper it connects to whatever kafka is managed by that Zookeeper. So mentioning that may not be necessary atleast at the local level.But Thanks anyway..
Upvotes: 1
Reputation: 756
You have to configure the config
so that it'll know the Kafka server port, for example as follows:
Properties props = new Properties();
//default broker port = 9092
props.put("metadata.broker.list", "localhost:" + BROKER_PORT);
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
Config config = new Config();
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
config.setDebug(true);
config.setMaxTaskParallelism(5);
Upvotes: 0