Logiraptor
Logiraptor

Reputation: 1518

Storm ShellBolt "loses" tuples

I have a topology that looks like so:

Storm topology

Notice that Bolt2 and Bolt3 receive tuples from both Bolt1 and Bolt4. All bolts are ShellBolts running python scripts, and the spout is a ShellSpout running a python script which reads from RabbitMQ. Everything works as expected except for Bolt4. If I add one message at a time to RabbitMQ, it will all work and finish cleanly. If I queue messages while one is pending on Bolt4, it will never be processed by Bolt4. The other bolts still perform their functions, but Bolt4 will just sit there after finishing the first one.

Storm UI shows that all tuples are executed by bolt4, but only one is acked. None are failed. I'm using Storm 0.9.5, and the multilang python adapter included in storm-starter.

The shellbolt and spout implementations just declare output fields, nothing else.

If I set TOPOLOGY_MAX_SPOUT_PENDING to 1, then it all works, but then I can only process one tuple at a time, and Bolt1 & 2 end up waiting on Bolt4 to be ready.

Each bolt takes between 3-30 seconds per tuple to complete.

So my question is: Where should I look next?

EDIT: Here's a minimal failing case.

bolt1.py:

import storm
import time
import json


class Bolt1(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        self.emit("bolt3Queue", objID, **APIArgs)
        self.emit("bolt2Queue", objID, **APIArgs)
        self.emit("bolt4Queue", objID, **APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt1().run()

bolt2.py:

import storm
import time
import json


class Bolt2(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt2().run()

bolt3.py:

import storm
import time
import json


class Bolt3(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt3().run()

bolt4.py:

import storm
import time
import json


class Bolt4(storm.BasicBolt):
    # overrides storm.Bolt.process

    def process(self, tup):
        objID, APIArgs = tup.values
        APIArgs = json.loads(APIArgs)
        self.emit("bolt3Queue", objID, **APIArgs)
        self.emit("bolt2Queue", objID, **APIArgs)
        storm.ack(tup)

    def emit(self, stream, objID, **APIArgs):
        tup = [objID, json.dumps(APIArgs)]
        storm.log("Emit [%s] %s" % (stream, tup))
        storm.emit(tup, stream=stream)

if __name__ == '__main__':
    Bolt4().run()

spout.py:

import storm
import random

class Spout(storm.Spout):

    def nextTuple(self):
        storm.emit(["id1234", "{}"], id=str(random.randint(1, 10000)))

if __name__ == '__main__':
    Spout().run()

The topology:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;


public class PyroTopology {
  public static class PythonBolt extends ShellBolt implements IRichBolt {

    public PythonBolt(String script) {
      super("python", script);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class Bolt4 extends ShellBolt implements IRichBolt {

    public Bolt4() {
      super("python", "bolt4.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
      declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class Bolt1 extends ShellBolt implements IRichBolt {

    public Bolt1() {
      super("python", "bolt1.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
      declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
      declarer.declareStream("bolt4Queue", new Fields("objID", "APIArgs"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class PythonSpout extends ShellSpout implements IRichSpout {

    public PythonSpout() {
      super("python", "spout.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("objID", "APIArgs"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("rabbit", new PythonSpout(), 1);

    builder.setBolt("bolt1", new Bolt1(), 1).
            shuffleGrouping("rabbit");

    builder.setBolt("bolt4", new Bolt4(), 1).
            shuffleGrouping("bolt1", "bolt4Queue");

    builder.setBolt("bolt3", new PythonBolt("bolt3.py"), 1).
            shuffleGrouping("bolt1", "bolt3Queue").
            shuffleGrouping("bolt4", "bolt3Queue");

    builder.setBolt("bolt2", new PythonBolt("bolt2.py"), 1).
            shuffleGrouping("bolt1", "bolt2Queue").
            shuffleGrouping("bolt4", "bolt2Queue");

    Config conf = new Config();
    conf.setStatsSampleRate(1.0);
    conf.put(Config.TOPOLOGY_DEBUG, true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5);
    conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(30000);

      cluster.shutdown();
    }
  }
}

I deploy with this:

storm jar target/storm-starter-0.9.5-jar-with-dependencies.jar storm.starter.PyroTopology vb

After 1 tuple per component, the whole system hangs. No new tuples are processed or failed. Here's my Storm UI after the system gets stuck:

Storm UI output

It just sits like this forever. (I've waited several hours just in case.)

Upvotes: 0

Views: 935

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You are using BasicBolt which does handle acking automatically for you. Thus, you must not ack input tuples manually in you code. This results in multiple ackes for single tuples which confuses Storm's mechanism to track acks (by xor-ing messages IDs and ack IDs). As an alternative (if you need advanced acking behavior you could implement Bolt.

As the UI shows, no ack is received by your spout, thus Storm stops to emit tuples when max spout pending is reached. Furthermore, you see that the counts of "executed" and "acked" of your bolts do not match -- this also indicates that acks are not processed correctly.

Upvotes: 2

Related Questions