Fred Quatro
Fred Quatro

Reputation: 166

java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V

I am trying to setup a Local Cluster with Apache Storm to get a basic topology. Currently using Kafka 2.11-0.8.2.1 & Storm 0.9.5. I have a Spout and a Bolt configured, and a couple messages have been passed through the pipeline successfully. I am getting an exception after 2 messages are sent, but don't know why. Might be a compatibility/version issue with the jars. I added many more jars to get this thing running, but it still crashes after sending a couple messages. The exception seems to come from the spout.

Topology Code is here:

package com.smartstart.storm.topology;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Properties;

import org.apache.log4j.Logger;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import kafka.api.OffsetRequest;
import backtype.storm.LocalCluster;

import com.smartstart.storm.bolt.PingMessageBolt;

public class PingMessageTopology {
    public static final Logger LOG = Logger.getLogger(PingMessageTopology.class);
    private BrokerHosts brokerHosts;

    public PingMessageTopology(String kafkaZk) {
        brokerHosts = new ZkHosts(kafkaZk);
    }

    private StormTopology buildTopology(Properties props) {
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, props.getProperty("KAFKA_TOPIC"),
                props.getProperty("KAFKA_OFFSETMARKER"), "PingMessage_Topology");
        //boolean forceFromstart = Boolean.valueOf(props.getProperty(props.getProperty("KAFKA_FORCE_FROM_START")));
        boolean forceFromstart = Boolean.valueOf(props.getProperty("KAFKA_FORCE_FROM_START"));

        kafkaConfig.forceFromStart = forceFromstart;
        LOG.debug("forceFromstartTrue "+forceFromstart);
        if (forceFromstart) {
            kafkaConfig.startOffsetTime = OffsetRequest.EarliestTime();
        }
        //kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("PingMessage_Spout", new KafkaSpout(kafkaConfig), 4);
        builder.setBolt("PingMessage_Bolt", new PingMessageBolt(), 4).shuffleGrouping("PingMessage_Spout");
        return builder.createTopology();
    }

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        String Propertiesfile = "PingTopology.properties";
        Properties props = new Properties();

        try {
            InputStream in = ClassLoader.getSystemResourceAsStream(Propertiesfile);
            LOG.info("in: " + in);
            if (in != null) {
                props.load(in);
                PingMessageTopology kafkaSpoutTestTopology = new PingMessageTopology(props.getProperty("kafkaZk"));
                StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology(props);
                Config config = new Config();
                config.setDebug(true);

                // create a local Storm cluster
                LocalCluster cluster = new LocalCluster();  // necessary for local cluster to run
                cluster.submitTopology(props.getProperty("Topology_Name"), config, stormTopology);
            } else {
                LOG.info("Please Specify the Properties.....");
            }
        } catch (Exception e) {
            LOG.error("Check The Configuration File...", e);
        }
    }
}

Bolt code snippet that prints out the Logger messages is here:

package com.smartstart.storm.bolt;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoOptions;
import com.mongodb.ServerAddress;

public class PingMessageBolt extends BaseRichBolt {
    public static final Logger LOG = Logger.getLogger(PingMessageBolt.class);

    private Mongo mongo;
    private DB db;
    private DBCollection sessioncollection, messagecollection, messagecollectionarc,errorCollection;
    private OutputCollector collector;
    /*For fetching URL from properties and to use in all places in the class */
    private String urlString;
    /*Common date formatter to use in Insert and update */
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd't'HH:mm:ss.SSS'z'");
    private String apiKey;

    @Override
    public void prepare(Map stromConf, TopologyContext context, OutputCollector collector) {
        String PropertiesFile = "PingTopology.properties";
        Properties props = new Properties();
        this.collector=collector;
        try {
            InputStream in = ClassLoader.getSystemResourceAsStream(PropertiesFile);
            LOG.debug("bolt prepare--------------------------------");
            LOG.debug("InputStream string :"+ convertStreamToString(in));
            LOG.debug("-------------------------------------------");

            LOG.debug(in);
            if (in != null) {
                props.load(in);
                /*URL retrieving from properties file.*/
                urlString = props.getProperty("URL");
                apiKey = props.getProperty("API_KEY");
                String mongoHost = props.getProperty("MONGO_HOST");
                List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>();

                if (mongoHost != null) {
                    String[] hostNames = mongoHost.split(",");
                    int port = Integer.parseInt(props.getProperty("MONGO_PORT"));
                    for (String hostName : hostNames) {
                        ServerAddress address = new ServerAddress(hostName, port);
                        serverAddresses.add(address);
                    }
                }
                MongoOptions options = new MongoOptions();
                options.setAutoConnectRetry(true);
                LOG.debug(props.getProperty("MONGO_AUTOCONNECT_RETRY_TIME"));
                options.setMaxAutoConnectRetryTime(
                        Long.valueOf(props.getProperty("MONGO_AUTOCONNECT_RETRY_TIME")));
                mongo = new Mongo(serverAddresses, options);
                db = mongo.getDB(props.getProperty("DB_NAME"));
                boolean auth = db.authenticate(props.getProperty("MONGO_USER"), props.getProperty("MONGO_PWD").toCharArray());
                // (props.getProperty("MONGO_USER"),
                // props.getProperty("MONGO_PWD").toCharArray());
                System.out.println("" + auth);

                if (auth) {
                    sessioncollection = db.getCollection("clientSessions");
                    messagecollection = db.getCollection("clientMessages");
                    errorCollection = db.getCollection("clientErrorMessages");
                    messagecollectionarc = db.getCollection("clientMessagesArc");
                } else {
                    LOG.warn("incorrect user name or password", new Throwable(
                            "Incorrect UserName or Password"));
                }

            } else {
                LOG.debug("Please check the configuration file....");
            }
        } catch (Exception e) {
            LOG.error("Exception in Bolt....", e);
        }
    }

    static String convertStreamToString(java.io.InputStream is) {
        java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
        return s.hasNext() ? s.next() : "";
    }
    @Override
    public void execute(Tuple input) {
        try {
            LOG.debug("bolt execute --------------------------------");
            LOG.debug("Tuple string :"+ input.getValue(0).toString());
            LOG.debug("-------------------------------------------");

            DBObject recivedobj = (DBObject) com.mongodb.util.JSON.parse(input.getValue(0).toString());
            BasicDBObject recvObj = (BasicDBObject) recivedobj;
            BasicDBObject msgvalue = (BasicDBObject) recvObj.get("SET");

            if (input.getValues().size() > 0) {
                JSONParser jsonReceivedObj = new JSONParser();
                JSONObject jsonTObj = (JSONObject) jsonReceivedObj.parse(input.getValue(0).toString());
                JSONObject receivedJSONObject = (JSONObject) jsonTObj.get("SET");
                LOG.debug("receivedJSONObject " + receivedJSONObject);

                String operation = (String) jsonTObj.get("OPERATION");
                /*Written common query which is used in both insert and update*/
                BasicDBObject sessionbd = (BasicDBObject) sessioncollection
                        .findOne(querySpecification("sessionGuiId", (int) msgvalue.get("session_gui_id")));
                LOG.debug("sessionbd " + sessionbd);
                switch (operation) {
                case "INSERT": {
                    BasicDBObject msgobj = insertPingMessage(receivedJSONObject, sessionbd);
                    LOG.debug("msgobj query: " + msgobj);
                    break;
                }
                case "UPDATE": {
                    updatePingMessage(receivedJSONObject, sessionbd);
                    break;
                }
                default: {
                    LOG.debug("Please mention correct operation...");
                    break;
                 }
                }
            }
        } catch (Throwable e) {
            try{

                LOG.error(e);
                BasicDBObject errorInsert = new BasicDBObject();
                errorInsert.append("error", input.getValues());
                errorInsert.append("fromtopology", "PingMessage");
                errorInsert.append("exception",e.getMessage());
                StringBuilder stack=new StringBuilder();
                for (int i = 0; i < e.getStackTrace().length; i++) {
                StackTraceElement elem=e.getStackTrace()[i];
                stack.append(elem.getClassName()+"."+elem.getMethodName()+"():"+elem.getFileName()+":"+elem.getLineNumber()+" \n");
                }
                errorInsert.append("exceptionstacktrace",stack.toString());
                errorCollection.insert(errorInsert);

            } 
            catch(Throwable t){
                LOG.error(t);
            }

        }
        finally {
            collector.ack(input);    
        }
    }

    @Override
    public void cleanup() {
        mongo.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

Combined screen shots listing all the jars is here: enter image description here

The console is flooded with Logger messages including the Exception.
Here is the edited output:

2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:zookeeper.version=3.3.2-1031432, built on 11/05/2010 05:32 GMT
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:host.name=USTX04NB002865.corp.smartstartinc.com
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.version=1.8.0_102
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.vendor=Oracle Corporation
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.home=C:\Program Files\Java\jre1.8.0_102
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.class.path=C:\Users\Fred.Quatro\workspace\PingMessage\bin;C:\Users\Fred.Quatro\workspace\PingMessage\lib\log4j-1.2.16.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\metrics-core-2.1.2.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\metrics-core-2.1.5.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\scala-library-2.10.4.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\kafka-clients-0.8.2.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\commons-lang-2.6.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\snakeyaml-1.14.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\json-simple-1.1.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\mongo-2.10.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\guava-18.0.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\jersey-client.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\storm-core-0.9.5.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\storm-kafka-0.9.2-incubating.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\java-json-schema.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\netty-all-4.0.34.Final.jar;C:\JarLibrary\slf4j-log4j12-1.4.3.jar;C:\JarLibrary\slf4j-api-1.4.3.jar;C:\JarLibrary\commons-exec-1.1.jar;C:\JarLibrary\commons-io-2.4.jar;C:\JarLibrary\disruptor-2.10.1.jar;C:\JarLibrary\kryo-3.0.1.jar;C:\JarLibrary\objenesis-1.2.jar;C:\JarLibrary\minlog-1.2.jar;C:\JarLibrary\clojure-1.5.1.jar;C:\JarLibrary\carbonite-1.4.0.jar;C:\JarLibrary\chill_2.10-0.3.0.jar;C:\JarLibrary\chill-java-0.3.5.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\kafka_2.10-0.8.2.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\zookeeper-3.3.2.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\curator-client-2.3.0.jar;C:\JarLibrary\curator-framework-2.1.0-incubating.jar
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.library.path=C:\Program Files\Java\jre1.8.0_102\bin;C:\windows\Sun\Java\bin;C:\windows\system32;C:\windows;C:/Program Files/Java/jre1.8.0_102/bin/server;C:/Program Files/Java/jre1.8.0_102/bin;C:/Program Files/Java/jre1.8.0_102/lib/amd64;C:\ProgramData\Oracle\Java\javapath;C:\windows\system32;C:\windows;C:\windows\System32\Wbem;C:\windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\TortoiseSVN\bin;C:\apache-storm-0.9.5\bin;C:\Program Files\Java\jre1.8.0_102\bin;C:\Python27;C:\Python27\Lib\site-packages\;C:\Python27\Scripts\;;C:\Users\Fred.Quatro\Desktop;;.
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.io.tmpdir=C:\Users\FRED~1.QUA\AppData\Local\Temp\
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:java.compiler=<NA>
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:os.name=Windows 7
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:os.arch=amd64
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:os.version=6.1
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:user.name=Fred.Quatro
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:user.home=C:\Users\Fred.Quatro
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO  ZooKeeper:97 - Client environment:user.dir=C:\Users\Fred.Quatro\workspace\PingMessage
2016-08-25 08:24:55 [Thread-19-PingMessage_Spout] ERROR util:0 - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
    at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:167)
    at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
    at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
    at org.apache.curator.ConnectionState.reset(ConnectionState.java:210)
    at org.apache.curator.ConnectionState.start(ConnectionState.java:101)
    at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:232)
    at storm.kafka.ZkState.<init>(ZkState.java:62)
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522)
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Unknown Source)
****** edited *********
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:88 - 107e0749ffffff95ffffffd25174053a706f7274737107e0cffffffab5e58ffffff98707404706f72747107e01d70737107e0f0043,v{s{31,s{'world,'anyone}}},2  response:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout/e0000000002 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] ERROR util:0 - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
    at clojure.lang.RestFn.invoke(RestFn.java:423)
    at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:493)
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240)
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Unknown Source)
2016-08-25 08:24:55 [Thread-6-SendThread(127.0.0.1:2000)] DEBUG ClientCnxn:818 - Reading reply sessionid:0x156c1dfe5f1000b, packet:: clientPath:null serverPath:null finished:false header:: 91,1  replyHeader:: 91,43,0  request:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout/e,#ffffffacffffffed05737201f636c6f6a7572652e6c616e672e50657273697374656e7441727261794d6170ffffffd02836ffffff8f21ffffffe4ffffffa0f2024c055f6d6574617401d4c636c6f6a7572652f6c616e672f4950657273697374656e744d61703b5b056172726179740135b4c6a6176612f6c616e672f4f626a6563743b787201b636c6f6a7572652e6c616e672e4150657273697374656e744d6170784ffffffabffffffaa63ffffffef537020249055f6861736849075f6861736865717870ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff7075720135b4c6a6176612e6c616e672e4f626a6563743bffffff90ffffffce58ffffff9f1073296c200787000087372014636c6f6a7572652e6c616e672e4b6579776f72643931ffffffc4627a595b82034904686173684c045f737472740124c6a6176612f6c616e672f537472696e673b4c0373796d740154c636c6f6a7572652f6c616e672f53796d626f6c3b787024fffffffdffffff84ffffff84740a3a74696d652d736563737372013636c6f6a7572652e6c616e672e53796d626f6cffffffbcffffff897ffffffed1cffffffa744ffffffc32054904686173684c055f6d6574617107e014c045f7374727107e084c046e616d657107e084c026e737107e087870ffffff86ffffffc6affffffcb70740974696d652d736563737107e0e7073720116a6176612e6c616e672e496e746567657212ffffffe2ffffffa0ffffffa4fffffff7ffffff81ffffff8738201490576616c756578720106a6176612e6c616e672e4e756d626572ffffff86ffffffacffffff951dbffffff94ffffffe0ffffff8b200787057ffffffbefffffff1ffffffa7737107e07ffffffb3515bffffffec70737107e0c1519ffffffe233707074056572726f7270744676a6176612e6c616e672e4e6f537563684d6574686f644572726f723a206f72672e6170616368652e7a6f6f6b65657065722e5a6f6f4b65657065722e3c696e69743e284c6a6176612f6c616e672f537472696e673b494c6f72672f6170616368652f7a6f6f6b65657065722f576174636865723b5a2956da96174206f72672e6170616368652e63757261746f722e7574696c732e44656661756c745a6f6f6b6565706572466163746f72792e6e65775a6f6f4b65657065722844656661756c745a6f6f6b6565706572466163746f72792e6a6176613a323929da96174206f72672e6170616368652e63757261746f722e6672616d65776f726b2e696d70732e43757261746f724672616d65776f726b496d706c24322e6e65775a6f6f4b65657065722843757261746f724672616d65776f726b496d706c2e6a6176613a31363729da96174206f72672e6170616368652e63757261746f722e48616e646c65486f6c64657224312e6765745a6f6f4b65657065722848616e646c65486f6c6465722e6a6176613a393429da96174206f72672e6170616368652e63757261746f722e48616e646c65486f6c6465722e6765745a6f6f4b65657065722848616e646c65486f6c6465722e6a6176613a353529da96174206f72672e6170616368652e63757261746f722e436f6e6e656374696f6e53746174652e726573657428436f6e6e656374696f6e53746174652e6a6176613a32313029da96174206f72672e6170616368652e63757261746f722e436f6e6e656374696f6e53746174652e737461727428436f6e6e656374696f6e53746174652e6a6176613a31303129da96174206f72672e6170616368652e63757261746f722e43757261746f725a6f6f6b6565706572436c69656e742e73746172742843757261746f725a6f6f6b6565706572436c69656e742e6a6176613a31383829da96174206f72672e6170616368652e63757261746f722e6672616d65776f726b2e696d70732e43757261746f724672616d65776f726b496d706c2e73746172742843757261746f724672616d65776f726b496d706c2e6a6176613a32333229da961742073746f726d2e6b61666b612e5a6b53746174652e3c696e69743e285a6b53746174652e6a6176613a363229da961742073746f726d2e6b61666b612e4b61666b6153706f75742e6f70656e284b61666b6153706f75742e6a6176613a383529da96174206261636b747970652e73746f726d2e6461656d6f6e2e6578656375746f7224666e5f5f3333373124666e5f5f333338362e696e766f6b65286578656375746f722e636c6a3a35323229da96174206261636b747970652e73746f726d2e7574696c246173796e635f6c6f6f7024666e5f5f3436302e696e766f6b65287574696c2e636c6a3a34363129da9617420636c6f6a7572652e6c616e672e41466e2e72756e2841466e2e6a6176613a323429da96174206a6176612e6c616e672e5468726561642e72756e28556e6b6e6f776e20536f7572636529da737107e0748ffffffe94e4470737107e0cffffffaaffffffb1ffffffd4ffffff8b70707404686f737470740255553545830344e423030323836352e636f72702e736d6172747374617274696e632e636f6d737107e0749ffffff95ffffffd25174053a706f7274737107e0cffffffab5e58ffffff98707404706f72747107e01d70737107e0f0043,v{s{31,s{'world,'anyone}}},2  response:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout/e0000000003 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:88 - Processing request:: sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5c zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:160 - sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5c zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:88 - Processing request:: sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout
2016-08-25 08:24:55 [Thread-6-SendThread(127.0.0.1:2000)] DEBUG ClientCnxn:818 - Reading reply sessionid:0x156c1dfe5f1000b, packet:: clientPath:null serverPath:null finished:false header:: 92,12  replyHeader:: 92,43,0  request:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout,F  response:: v{'e0000000003,'e0000000002,'e0000000001,'e0000000000},s{34,34,1472131495046,1472131495046,0,4,0,0,1,4,43} 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:160 - sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout

Upvotes: 0

Views: 4588

Answers (3)

markthegrea
markthegrea

Reputation: 3851

We got this error when we added Apache Collections4 to our pom:

    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-collections4</artifactId>
        <version>4.0</version>
    </dependency>

We needed the ListUtils.partition() method. Removing this jar fixed it.

Upvotes: 0

Fred Quatro
Fred Quatro

Reputation: 166

There are lots of other things (jars) necessary to run Kafka with Storm. i.e. curator, skala, yaml, & clojure, to name a few. And all jars must be compatible with one another, otherwise you get the dreaded NoClassDefFoundError. This is probably not the only answer, but here is a list that worked for me in case it helps anyone:

enter image description here

Upvotes: 3

Darpan27
Darpan27

Reputation: 237

There is missing in your configuration of kafkaSpout. Make sure your zookeeper (the one you have passed in brokerHosts) is working. You can easily check it by zkCli.sh and see the broker ids. I would check the spoutConfig properties and kafka/zookeeper is running correctly or not. I have the following spoutconfig details:

spoutConfig sc = new SpoutConfig("localhost:2181","topicname", "/offset-dir","id");
sc.bufferSizeBytes = 1024*1024*4;
sc.fetchSizeBytes = 1024*1024*4;
sc.forceFromStart = true;
sc.scheme = new SchemeAsMultipleScheme(new StringScheme());

You didnt setup the task in sout/bolt.

builder.setSpout("PingMessage_Spout", new KafkaSpout(kafkaConfig), 4).setNumTasks(1);
        builder.setBolt("PingMessage_Bolt", new PingMessageBolt(), 4).setNumTask(1).shuffleGrouping("PingMessage_Spout");
        return builder.createTopology();

I believe zookeeper is getting stopped. I have similar configs and its working.

Upvotes: 0

Related Questions