Ramkumar KS
Ramkumar KS

Reputation: 479

Storm not hounoring max spout spending

I have created a sample topology to test set the max spout spending property. It is a simple toplogy with 1 spout and a bolt. The spout emits 100000 tuples and the bolt acks after sleeping for a second. I have set the max spout spending property to 10. I assume this means that a spout will not emit any tuples if the non acked messages count is 10 for that spout. But when I run the topology, I can see the spout emitting 2160 messages and then waits. Is my understanding correct or am I missing some thing. I am using storm 0.9.5. Below is the code

public static void main(String[] args) {

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new TestSpout(), 1);
    builder.setBolt("bolt", new TestBolt(),1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.setNumWorkers(1);
    conf.setMaxSpoutPending(10);
    try {
        StormSubmitter.submitTopology("test", conf, builder.createTopology());
    } catch (AlreadyAliveException e) {
        e.printStackTrace();
    } catch (InvalidTopologyException e) {
        e.printStackTrace();
    }
}


public class TestSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int count = 1;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("spoutData"));
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.collector = collector;
    System.out.println(context.maxTopologyMessageTimeout());
}

@Override
public void nextTuple() {

    if(count <= 100000) {
        System.out.println("Emitting : " + count);
        collector.emit(new Values(count++ + ""));
    }
}

}

public class TestBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

@Override
public void execute(Tuple input) {
    try {
        System.out.println(input.getString(0));
        Thread.sleep(1000);
        collector.ack(input);
    } catch (InterruptedException e) {
        e.printStackTrace();
        System.out.println("Exception");
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

Upvotes: 1

Views: 394

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You need to assign message IDs to the tuples you emit in your Spout.nextTuple() method. Otherwise, parameter max.spout.pending is ignored. For example, you can use your count variable as ID (Basically, anything can be used as an ID. It must only be unique.)

@Override
public void nextTuple() {
    if(count <= 100000) {
        System.out.println("Emitting : " + count);
        collector.emit(new Values(count++ + ""), count);
    }
}

Otherwise, Storm cannot link output tuples to the tuples that are acked in your bolt, ie, Storm cannot count how many tuples are pending. Only tuple with an ID can be tracked by Storm.

Upvotes: 3

Related Questions