dsafa
dsafa

Reputation: 793

Storm - KafkaSpout fails on open()

For the past two days, I’ve been tying to implement a KafkaSpout within our topology. Here are some important information.

All three services are running on the same instance. Kafka’s brokers use as by default the 9092 port, with advertised.listeners set to PLAINTEXT://localhost:9092. Zookeeper, uses the default client port 2181. Whereas the Storm Nimbus host name has been set to localhost as well.

A custom Kafka Producer creates log messages successfully, whereas by using the zkCli Zookeeper script I’ve seen that when using the /brokers path, the partitions and other relevant information are stored correctly.

However, I keep getting the error when activating, and afterwards monitoring the topology. Here is the source code of the Storm topology I’ve implemented:

BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");

SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes");

StormTopology topology = builder.createTopology();

Config config = new Config();

StormSubmitter.submitTopology("topology", config, topology);

However, the error message I keep getting when executing the bin/storm monitor <topology_name> -m bytes is the following:

Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)

Whereas by inspecting the logs of the workers (the worker.log file), I’ve concluded that the KafkaSpout fails on the open() method.

java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more

Could someone explain what might be the reason for the KafkaSpout to fail on the open() method?

I would really appreciate for your help!

Upvotes: 0

Views: 339

Answers (1)

Sarwesh Suman
Sarwesh Suman

Reputation: 149

From the error "java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy" it appears that curator-client.jar is missing.

Please check if below link helps you ?

https://github.com/abhinavg6/Kafka-Storm-Conscomp/issues/1

Upvotes: 0

Related Questions