Adrian Seungjin Lee
Adrian Seungjin Lee

Reputation: 1666

stop processing tuple in certain bolt

I have a topology for example composed of 1 spout and 4 bolts

spout A -> bolt B -> bolt C -> bolt E
                  -> bolt D

Only if some conditional statement in bolt B is true then it passes a tuple to bolt C and bolt D.

And only if some conditional statement in bolt C is true then it passes a tuple to bolt E.

So single tuple may reach only bolt B or (bolt C and D).

I'm using BaseBasicBolt which to my knowledge it acks automatically after collector.emit is called.

For example execute method in bolt B is like below

public class boltB extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        ...some logic goes here
        if (response.getCount() > 0) {
            collector.emit(new Values(tuple.getString(0)));
        }
    }
}

So if collector.emit is not called, I think tuple from spout is failed because I see from storm ui that almost all of tuples from spout is failed.

In this case, where should I call 'ack' for spout not to consider it as failed tuple?

Upvotes: 5

Views: 2894

Answers (2)

Ben Tse
Ben Tse

Reputation: 669

What you are doing is correct for the logic you are implementing. You do not need to explicitly call ack(). When using BaseBasicBolt, each tuple are acked after the execute() method by BasicBoltExecutor. For the failed tuples, you should check for exceptions. Also try looking at Storm UI for anomalies in tuple emitted/executed/failed for each spout and bolt.

Upvotes: 7

Mzf
Mzf

Reputation: 5260

When you have BaseBasicBolt - acking is done for you, even if you are not emitting anything.

The BaseBasicBolt instance is executed in BasicBoltExecutor, whose execute() method is shown below:

public void execute(Tuple input) {
     _collector.setContext(input);
     try {
         _bolt.execute(input, _collector);
         _collector.getOutputter().ack(input);
     } catch(FailedException e) {
         if(e instanceof ReportedFailedException) {
             _collector.reportError(e);
         }
         _collector.getOutputter().fail(input);
     }
}

So in order to stop processing a tuple , just don't emit, after the execute call, it will be acked. Since there no more bolts to run the ack call back in the spout will be called

Hope it's answer your questions

Upvotes: 4

Related Questions