user84592
user84592

Reputation: 4882

storm cluster mode, distributed bolt/worker load sharing

HI: I will have a large capacity storm analysis task. For me, I want to spin off many bolt/workers across different nodes/machines to take the task so that every machine could share the load . I am wondering how to write bolt/workers/topology so that they could communicate with each other. In the below codes, I submit topology in one machine, how to write bolt/worker/config in other machines so that topology is aware of other machines' bolt/worker. I suppose I could not submit topology in one machine and submit same topology in other machines. Any hints on storm worker load sharing?

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class StormClusterMain {
    private static final String SPOUTNAME="KafkaSpout"; 
    private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
    private static final String CLIENTID = "ClusterStorm";
    private static final String TOPOLOGYNAME = "ClusterTopology";

    private static class AppAnalysisBolt extends BaseRichBolt {
        private static final long serialVersionUID = -6885792881303198646L;
        private static final String collectionName="clusterusers";
        private OutputCollector _collector;
        private AtomicInteger index = new AtomicInteger(0); 
        private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class); 

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {  
            boltLogger.error("Message received:"+tuple.getString(0));
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }


    }

   public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{

       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
       }else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("Usage :.xxx");
           System.exit(-1);
       }        

       SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
            topicName,
            "",// zookeeper root path for offset storing
            CLIENTID);
       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout(SPOUTNAME, kafkaSpout, 1);
       builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
                                                 .shuffleGrouping(SPOUTNAME);

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());

Upvotes: 0

Views: 1182

Answers (1)

nelsonda
nelsonda

Reputation: 1188

You've already done it, unless something has gone wrong.

When you submit a topology to Storm, the Nimbus service looks at the load on the cluster via the Supervisor processes spread throughout the cluster. Nimbus then provides some quantity of resources for the topology to run. Those resources are oftentimes spread throughout the various Supervisor nodes in the cluster, and they will process tuples in parallel. Nimbus occasionally revisits these decisions and changes which nodes process what in an attempt to keep load in the cluster balanced. As a user you should never notice that process.

Assuming your Storm cluster is setup properly, the only thing you have to do to is submit the topology. Storm takes care of the whole multi-node parallel processing thing for you.

That said, the AtomicInteger you have in there is going to act pretty weird, as storm slices up your code across multiple servers, and even multiple JVMs on a single host. If you want to solve for a case where individual storm processes need to know about state of the larger cluster, you will be best served externalizing that to some sort of independent datastore (ie redis or hbase).

Upvotes: 4

Related Questions