Reputation: 479
I have created a sample topology to test set the max spout spending property. It is a simple toplogy with 1 spout and a bolt. The spout emits 100000 tuples and the bolt acks after sleeping for a second. I have set the max spout spending property to 10. I assume this means that a spout will not emit any tuples if the non acked messages count is 10 for that spout. But when I run the topology, I can see the spout emitting 2160 messages and then waits. Is my understanding correct or am I missing some thing. I am using storm 0.9.5. Below is the code
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(), 1);
builder.setBolt("bolt", new TestBolt(),1).shuffleGrouping("spout");
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(10);
try {
StormSubmitter.submitTopology("test", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
public class TestSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int count = 1;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("spoutData"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
System.out.println(context.maxTopologyMessageTimeout());
}
@Override
public void nextTuple() {
if(count <= 100000) {
System.out.println("Emitting : " + count);
collector.emit(new Values(count++ + ""));
}
}
}
public class TestBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
System.out.println(input.getString(0));
Thread.sleep(1000);
collector.ack(input);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Exception");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Upvotes: 1
Views: 394
Reputation: 62330
You need to assign message IDs to the tuples you emit in your Spout.nextTuple()
method. Otherwise, parameter max.spout.pending
is ignored. For example, you can use your count
variable as ID (Basically, anything can be used as an ID. It must only be unique.)
@Override
public void nextTuple() {
if(count <= 100000) {
System.out.println("Emitting : " + count);
collector.emit(new Values(count++ + ""), count);
}
}
Otherwise, Storm cannot link output tuples to the tuples that are acked in your bolt, ie, Storm cannot count how many tuples are pending. Only tuple with an ID can be tracked by Storm.
Upvotes: 3