JS_Riddler
JS_Riddler

Reputation: 1580

What is the point of timing out tuples?

I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.

I'm seeing that timing out tuples does nothing except call "fail" on the Spout. The tuple itself will still be processed through the Topology, except acking/failing will have no effect. Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology. Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).

Here's my topology:

I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples. Over time, more and more tuples would be acked (though they would frequently time out).

What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt. I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it. Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.

I'm assuming, and hoping, that I'm missing something obvious here…

Two questions:

  1. Can I prevent Bolts from processing already-timed-out tuples?
  2. What is the point of timing out tuples? It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!

Thanks.

Spout:

public class SampleSpout extends BaseRichSpout {
    private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);

    SpoutOutputCollector collector;
    Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
    Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();

    int contentCounter;
    int curMsgId;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // unique-id always increments each time we emit.
        // msg-id gets incremented only when new tuples are created.
       declarer.declare(new Fields("msg-id", "content"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // either replay a failed tuple, or create a new one
        List<Object> tuple = null;
        if (replay_queue.size() > 0){
            tuple = replay_queue.poll();
        }else{
            tuple = new ArrayList<Object>();
            tuple.add(null);
            tuple.add("Content #" + contentCounter++);
        }

        // increment msgId and set it as the first item in the tuple
        int msgId = this.curMsgId++;
        tuple.set(0, msgId);
        logger.info("Emitting: " + tuple);
        // add this tuple to the 'pending' map, and emit it.
        pending_map.put(msgId, tuple);
        collector.emit(tuple, msgId);
        Utils.sleep(100);
    }

    @Override
    public void ack(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> acked_tuple = pending_map.remove(msgId);
        logger.info("Acked: " + acked_tuple);
    }

    @Override
    public void fail(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> failed_tuple = pending_map.remove(msgId);
        logger.info("Failed: " + failed_tuple);

        // put a copy into the replay queue
        ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
        replay_queue.add(copy);
    }
}

Bolt:

public class SamplePrintBolt extends BaseRichBolt {

    private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    @Override
    public void execute(Tuple input) {
        logger.info("I see: " + input.getValues());
        Utils.sleep(4000);
        logger.info("Done sleeping. Acking: "  + input.getValues());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // doesn't emit
    }
}

Main:

public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        conf.setMessageTimeoutSecs(5);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SampleSpout());
        builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("local", conf, builder.createTopology());
}

Output:

30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
-- Why is it looking at tuple #3?  This has already failed.
45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
-- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.

Upvotes: 1

Views: 2225

Answers (1)

JS_Riddler
JS_Riddler

Reputation: 1580

"Time outs" is a feature to catch any tuples that have gone missing -- either they were sent to workers that have died, or a Bolt simply did not ack or fail.

Thus, "topology.message.timeout.secs" should be set to some value that you think your topology will never exceed, but which is also within your requirements for how long something can take to be processed. If your value is too low, you risk timing out tuples that are still "alive", and clogging up your system (as in the original post). If your value is too high, you risk waiting too long to reprocess a failed tuple (again, this depends on your requirements).

In the case of timing out "live" tuples, one solution is to synchronize clocks across all systems, emit a timestamp on each tuple, and have each individual Bolt simply pass on tuples that are too old and thus assumed to be timed out.

In my humble opinion, this behavior should be handled by Storm itself. Storm could send (spout, timestamp) data with all tuples, and upon each batch of timeout simply tell each Bolt to fail anything older than (spout, timeout-timestamp) -- the comparison is quick enough, and Bolts could dump the (spout, timeout-timestamp) data once they encounter a tuple from the spout that is new enough.

Upvotes: 4

Related Questions