jvlier
jvlier

Reputation: 87

Unit testing in Apache Storm - timeout with BaseRichBolt but not with BaseBasicBolt

I'm trying to implement unit tests for Storm bolts (Java). The code below works fine and ends with a success on Storm 1.0.3:

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.887 sec

However, when I change BaseBasicParrotBolt on line 46 to BaseRichParrotBolt, the assertions never run and it ends with the following exception:

13610 [main] ERROR o.a.s.testing4j - Error in cluster java.lang.AssertionError: Test timed out (10000ms) (not (every? exhausted? (spout-objects spouts)))

If you step through it with a debugger you'll see that the bolt does receive and emit tuples, but it seems like Testing.completeTopology never returns. I find this really odd because the bolts are virtually the same. All my bolts extend from BaseRichBolt so I'd really like to make it work for those as well. Any ideas?

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.ILocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import static junit.framework.Assert.*;
import org.junit.Test;

public class StormTestExample {
    private final static String EVENT = "event";
    private final static String SPOUT_ID = "spout";
    private final static String BOLT_ID = "parrot";
    private final static List<String> COMPONENT_IDS = Arrays.asList(SPOUT_ID, BOLT_ID);

    @Test
    public void testBasicTopology() {
        MkClusterParam mkClusterParam = new MkClusterParam();
        mkClusterParam.setSupervisors(4);
        Config daemonConf = new Config();
        daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
        mkClusterParam.setDaemonConf(daemonConf);

        Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
            @Override
            public void run(ILocalCluster cluster) {
                TopologyBuilder builder = new TopologyBuilder();
                builder.setSpout(SPOUT_ID, new TestSpout());
                builder.setBolt(BOLT_ID, new BaseBasicParrotBolt()).shuffleGrouping(SPOUT_ID);
                StormTopology topology = builder.createTopology();

                MockedSources mockedSources = new MockedSources();
                mockedSources.addMockData(SPOUT_ID, 
                        new Values("nathan"),
                        new Values("bob"), 
                        new Values("joey"), 
                        new Values("nathan"));

                Config conf = new Config();
                conf.setNumWorkers(2);

                CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
                completeTopologyParam.setMockedSources(mockedSources);
                completeTopologyParam.setStormConf(conf);

                final Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);

                final Values expected = new Values(new Values("nathan"), new Values("bob"), new Values("joey"), 
                        new Values("nathan"));

                for (String component : COMPONENT_IDS) {
                    assertTrue("Error in " + component + " output", 
                               Testing.multiseteq(expected, Testing.readTuples(result, component)));
                }
            }
        });
    }

    private static class TestSpout extends BaseRichSpout {    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
            ofd.declare(new Fields(EVENT));
        }

        @Override
        public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
            throw new UnsupportedOperationException(); // Don't need an implementation to run the test.
        }

        @Override
        public void nextTuple() {
            throw new UnsupportedOperationException(); // Don't need an implementation to run the test.
        }
    }

    private static class BaseBasicParrotBolt extends BaseBasicBolt {    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
            ofd.declare(new Fields(EVENT));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector boc) {
            boc.emit(new Values(tuple.getValue(0)));
        }
    }

     private static class BaseRichParrotBolt extends BaseRichBolt {
        private OutputCollector oc;

        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
            ofd.declare(new Fields(EVENT));
        }

        @Override
        public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
            this.oc = oc;
        }

        @Override
        public void execute(Tuple tuple) {
            oc.emit(new Values(tuple.getValue(0)));
        }
     }
}

Upvotes: 0

Views: 629

Answers (1)

Mobility
Mobility

Reputation: 3305

If using BaseRichBolt , you should call ack() yourself in execute(), which is handled by BaseBasicBolt.

Upvotes: 1

Related Questions