Saurabh
Saurabh

Reputation: 73589

Is it necessary to ack a tuple in storm bolt

This seems confusing, some examples I have seen, where ack on a tuple is called in each bolt while some places that was not the case. What is the practice regarding this and what can be it's implications?

Upvotes: 1

Views: 1938

Answers (1)

Saurabh
Saurabh

Reputation: 73589

After searching around on internet and this answer, I found this link from docs which is really helpful in this regard.

How spout handle messages:

When a spot takes a message from the source say Kafka or Kestrel queue, it opens the message. This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed. While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue.

When a message is opened, Kestrel provides the client with the data for the message as well as a unique id for the message. The KestrelSpout uses that exact id as the message id for the tuple when emitting the tuple to the SpoutOutputCollector. Sometime later on, when ack or fail are called on the KestrelSpout, the KestrelSpout sends an ack or fail message to Kestrel with the message id to take the message off the queue or have it put back on.

When is Ack needed:

A client need to tell Storm whenever it's creating a new link in the tree of tuples also called anchoring, which is done by emitting a new tuple.

A Client also need to tell Storm when you have finished processing an individual tuple, which is done by ack. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately.

In the following example bolt splits a tuple containing a sentence into a tuple for each word. Each word tuple is anchored by specifying the input tuple as the first argument to emit. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream.

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

In contrast, if the word tuple is emitted like this:

_collector.emit(new Values(word));

Emitting the word tuple this way causes it to be unanchored. If the tuple fails be processed downstream, the root tuple will not be replayed. Depending on the fault-tolerance guarantees you need in your topology, sometimes it's appropriate to emit an unanchored tuple.

When is Ack not needed:

In many cases, bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt that encapsulates this pattern for you.

Following is the example of SplitSentence, which can be written as a BasicBolt like follows:

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to BasicOutputCollector are automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.

Edit

As commented, and as can be seen here, IBasicBolt takes care of acking for you, so whatever class is implementing IBasicBolt:

/**
 * Process the input tuple and optionally emit new tuples based on the input tuple.
 * 
 * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
 */

BaseBasicBolt and BaseRichBolt both implements IBasicBolt.

Upvotes: 2

Related Questions