Reputation: 11751
I'm trying to write a topology that does the following:
In reality I want to do some more processing on the collection.
I tested it locally and looks like it's working. However, I'm not sure if I've set the groupings on the bolts correctly and if this would work correctly when deployed on an actual storm cluster. I would appreciate if someone can help review this topology and suggest any errors, changes or improvements.
Thanks.
This is what my topology looks like.
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
Aggregation Bolt
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
Printer Bolt
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
Upvotes: 12
Views: 6056
Reputation: 1776
Hi as soon as you are trying to subscribe to more than one keyword you will run into problems. I suggest that your spout also emits the original keyword that was used to filter.
Then instead of doing shuffleGrouping I would do a fieldsGrouping
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout", new Fields("keyword"));
This way you make sure the results of a single keyword end up on the same bolt every time. Such that you can compute aggregates correctly. If you omit the fieldsGrouping Storm can instantiate any amount of your aggregate bolt and send any messages from the spout to any instance of the aggregate bolt which would in the end case wrong results.
Upvotes: 0
Reputation: 16392
From what I can see it looks good. The devil's in the details, though. I'm not sure what your aggregator bolt does but if it makes any assumptions about the values being passed to it then you should consider an appropriate fields grouping. This might not make that big of a difference as you're using the default parallelism hint of 1, but should you decide to scale with multiple aggregate bolt instances implicit logic assumptions you make may call for a non-shuffle grouping.
Upvotes: 4