Reputation: 87
I'm trying to implement unit tests for Storm bolts (Java). The code below works fine and ends with a success on Storm 1.0.3:
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.887 sec
However, when I change BaseBasicParrotBolt
on line 46 to BaseRichParrotBolt
, the assertions never run and it ends with the following exception:
13610 [main] ERROR o.a.s.testing4j - Error in cluster java.lang.AssertionError: Test timed out (10000ms) (not (every? exhausted? (spout-objects spouts)))
If you step through it with a debugger you'll see that the bolt does receive and emit tuples, but it seems like Testing.completeTopology
never returns. I find this really odd because the bolts are virtually the same. All my bolts extend from BaseRichBolt
so I'd really like to make it work for those as well. Any ideas?
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.ILocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import static junit.framework.Assert.*;
import org.junit.Test;
public class StormTestExample {
private final static String EVENT = "event";
private final static String SPOUT_ID = "spout";
private final static String BOLT_ID = "parrot";
private final static List<String> COMPONENT_IDS = Arrays.asList(SPOUT_ID, BOLT_ID);
@Test
public void testBasicTopology() {
MkClusterParam mkClusterParam = new MkClusterParam();
mkClusterParam.setSupervisors(4);
Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
mkClusterParam.setDaemonConf(daemonConf);
Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
@Override
public void run(ILocalCluster cluster) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, new TestSpout());
builder.setBolt(BOLT_ID, new BaseBasicParrotBolt()).shuffleGrouping(SPOUT_ID);
StormTopology topology = builder.createTopology();
MockedSources mockedSources = new MockedSources();
mockedSources.addMockData(SPOUT_ID,
new Values("nathan"),
new Values("bob"),
new Values("joey"),
new Values("nathan"));
Config conf = new Config();
conf.setNumWorkers(2);
CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
completeTopologyParam.setMockedSources(mockedSources);
completeTopologyParam.setStormConf(conf);
final Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);
final Values expected = new Values(new Values("nathan"), new Values("bob"), new Values("joey"),
new Values("nathan"));
for (String component : COMPONENT_IDS) {
assertTrue("Error in " + component + " output",
Testing.multiseteq(expected, Testing.readTuples(result, component)));
}
}
});
}
private static class TestSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(EVENT));
}
@Override
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
throw new UnsupportedOperationException(); // Don't need an implementation to run the test.
}
@Override
public void nextTuple() {
throw new UnsupportedOperationException(); // Don't need an implementation to run the test.
}
}
private static class BaseBasicParrotBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(EVENT));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector boc) {
boc.emit(new Values(tuple.getValue(0)));
}
}
private static class BaseRichParrotBolt extends BaseRichBolt {
private OutputCollector oc;
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(EVENT));
}
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
this.oc = oc;
}
@Override
public void execute(Tuple tuple) {
oc.emit(new Values(tuple.getValue(0)));
}
}
}
Upvotes: 0
Views: 629
Reputation: 3305
If using BaseRichBolt , you should call ack() yourself in execute(), which is handled by BaseBasicBolt.
Upvotes: 1