perkss
perkss

Reputation: 1067

Apache Storm: Ack not working

I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.

I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.

Question The ack or fail is not being called and I cannot work out why?

Here is a shortened code sample.

Spout Code using BaseRichSpout

public void nextTuple() {
    for( String usage : usageData ) {
    .... further code ....

    String msgID = UUID.randomUUID().toString()
                    + System.currentTimeMillis();

    Values value = new Values(splitUsage[0], splitUsage[1],
                    splitUsage[2], msgID);
    outputCollector.emit(value, msgID);
   }
}

@Override
public void ack(Object msgId) {
    this.pendingTuples.remove(msgId);
    LOG.info("Ack " + msgId);
}

@Override
public void fail(Object msgId) {
    // Re-emit the tuple
    LOG.info("Fail " + msgId);
    this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
}

Bolt Code using BaseRichBolt

@Override
public void execute(Tuple inputTuple) {

this.outputCollector.emit(inputTuple, new Values(serverData, msgId));

this.outputCollector.ack(inputTuple);
}

Final Bolt

@Override
public void execute(Tuple inputTuple) {
  ..... Simply reports does not emit .....
  this.outputCollector.ack(inputTuple);

}

Upvotes: 1

Views: 811

Answers (1)

perkss
perkss

Reputation: 1067

The reason the ack did not work was the use of the for loop in the spout. Changed this to a counter loop version below the emit and it works.

Example

        index++;
        if (index >= dataset.size()) {
            index = 0;
        }

Further to this thanks to the mailing list info. Its because the Spout runs on a single thread and will block in a for loop, as next tuple will not return therefore it will never be able to call ACK method.

Upvotes: 3

Related Questions