user3017482
user3017482

Reputation: 33

storm processing data extremely slow

What is ideal no of messages that can be processed on a single node machine with 1 spout and 1 bolt? and what are the possible ways to increase the processing speed of storm topology?.

Update : This is the sample code, it doesent have code for RabbitMQ and cassandra, but gives same performance issue.

// Topology Class
public class SimpleTopology {

public static void main(String[] args) throws InterruptedException {
    System.out.println("hiiiiiiiiiii");
    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("SimpleSpout", new SimpleSpout());
    topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout");

    Config config = new Config();
    config.setDebug(true);
    config.setNumWorkers(2);

    LocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology());

    Thread.sleep(2000);
}

}

// Simple Bolt 
public class SimpleBolt implements IRichBolt{

private OutputCollector outputCollector;

public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
    this.outputCollector = oc;
}

public void execute(Tuple tuple) {
    this.outputCollector.ack(tuple);
}

public void cleanup() {
    // TODO
}

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    // TODO
}

public Map<String, Object> getComponentConfiguration() {
    return null;
}

}

// Simple Spout

public class SimpleSpout implements IRichSpout{

private SpoutOutputCollector spoutOutputCollector;
private boolean  completed = false;
private static int i = 0;

public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {      
    this.spoutOutputCollector = soc;
}

public void close() {
    // Todo
}

public void activate() {
    // Todo
}

public void deactivate() {
    // Todo
}

public void nextTuple() {
    if(!completed)
    {
        if(i < 100000)
        {
            String item = "Tag" + Integer.toString(i++);
            System.out.println(item);
            this.spoutOutputCollector.emit(new Values(item), item);
        }
        else
        {  
            completed = true;
        }
    }
    else
    {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

public void ack(Object o) {
    System.out.println("\n\n OK : " + o);
}

public void fail(Object o) {
    System.out.println("\n\n Fail : " + o);
}

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declare(new Fields("word"));
}

public Map<String, Object> getComponentConfiguration() {
    return null;
}

}

Update: Is it possible that with shuffle grouping same tuple will be processed more than once? configuration used (spouts = 4. bolts = 4), the problem now is, with increase in no of bolts the performance is decreasing.

Upvotes: 3

Views: 3563

Answers (3)

Chris Gerken
Chris Gerken

Reputation: 16392

In your code you only emit one tuple per call to nextTuple(). Try emitting more tuples per call.

something like:

public void nextTuple() {

    int max = 1000;
    int count = 0;
    GetResponse response = channel.basicGet(queueName, autoAck);
    while ((response != null) && (count < max)) {

        // process message

        spoutOutputCollector.emit(new Values(item), item);

        count++;
        response = channel.basicGet(queueName, autoAck);
    }

    try { Thread.sleep(2000); } catch (InterruptedException ex) {
}

Upvotes: 2

Vor
Vor

Reputation: 35129

We are successfully using RabbitMQ and Storm. The result gets stored in a different DB, but anyway. We first used basic_get in Spout, and had a terrible performance, but then we swiched to basic_consume, and performance is actually very good. So take a look at how you consuming messages from Rabbit. Some important factors:

  • basic_consume instead of basic_get
  • prefetch_count (make it high enough)
  • If you want to increase performance, and you don't care about loosing messages - do not ack messages and set delivery_mode to 1.

Upvotes: 0

schiavuzzi
schiavuzzi

Reputation: 1072

You should find out what is the bottleneck here -- RabbitMQ or Cassandra. Open the Storm UI and take a look at the latency times for each component.

If increasing parallelism didn't help (it normally should), there's definitely a problem with RabbitMQ or Cassandra, so you should focus on them.

Upvotes: 4

Related Questions