u12345
u12345

Reputation: 389

integrating word count topology of storm with kafka

i am trying to integrate word count program of storm with kafka, for that my producer is working fine i.e it is reading a text file and sending each line as a message ,and i could see those messages in simple consumer console. Now for integrating it with storm i.e to send those messages/lines to consumer spout i have just replaced the previous storm spout of word count program with the kafka spout from storm-spout integration dependency and rest of the program is same and i am trying to run it in eclipse but it is not getting execute ,i dont know what is the problem and even dont know whether i am doing it in right way,here is my main class -

package com.spnotes.storm;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

import com.spnotes.storm.bolts.WordCounterBolt;
import com.spnotes.storm.bolts.WordSpitterBolt;

public class WordCount {

public static void main(String[] args) throws Exception{
    Config config = new Config();
    config.setDebug(true);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    BrokerHosts hosts = new ZkHosts("localhost:9092");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "localhost:2181", "id1");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("line-reader-spout", kafkaSpout);

    builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");

    builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");


    LocalCluster cluster = new LocalCluster();
    System.out.println("submit topology");
    Thread.sleep(10000);
    //StormSubmitter.submitTopology("HelloStorm5", config, builder.createTopology());
    cluster.submitTopology("HelloStorm5", config, builder.createTopology());
    cluster.shutdown();
}

}

There are 2 bolts WordSplitterBolt() and WordCounterBolt() ,Wordsplitterbolt is breaking each line/message into tokens/words and WordCounterBolt is counting the each word. Can anybody tell me m i doing anything in a wrong way? do i need to create own spout instead of using predefined KafkaSpout ? and is my main class correct?

Upvotes: 2

Views: 770

Answers (1)

Gaurav Mishra
Gaurav Mishra

Reputation: 1019

change code:

        BrokerHosts hosts = new ZkHosts(zkConnect);

zkConnect is zookeeper hostname and port not for kafka. change it to localhost:2181

As discussed on chat for rest issue related to code.

Issue was with Maven dependency.include all the dependency into POM.xml required.

Upvotes: 2

Related Questions