Soumya Simanta
Soumya Simanta

Reputation: 11751

Grouping in a simple aggregation storm topology

I'm trying to write a topology that does the following:

  1. A spout that subscribes to a twitter feed (based on a keyword)
  2. An aggregation bolt that aggregates a number of tweets (say N) in a collection and sends them the printer bolt
  3. A simple bolt that prints the collection to the console at once.

In reality I want to do some more processing on the collection.

I tested it locally and looks like it's working. However, I'm not sure if I've set the groupings on the bolts correctly and if this would work correctly when deployed on an actual storm cluster. I would appreciate if someone can help review this topology and suggest any errors, changes or improvements.

Thanks.

This is what my topology looks like.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
   builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
                .shuffleGrouping("spout");
   builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

Aggregation Bolt

public class SampleAggregatorBolt implements IRichBolt {

    protected OutputCollector collector;
    protected Tuple currentTuple;
    protected Logger log;
    /**
     * Holds the messages in the bolt till you are ready to send them out
     */
    protected List<Status> statusCache;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;

        log = Logger.getLogger(getClass().getName());
        statusCache = new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple) {
        currentTuple = tuple;

        Status currentStatus = null;
        try {
            currentStatus = (Status) tuple.getValue(0);
        } catch (ClassCastException e) {
        }
        if (currentStatus != null) {

            //add it to the status cache
            statusCache.add(currentStatus);
            collector.ack(tuple);


            //check the size of the status cache and pass it to the next stage if you have enough messages to emit
            if (statusCache.size() > 10) {
                collector.emit(new Values(statusCache));
            }

        }
    }

    @Override
    public void cleanup() {


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweets"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }


    protected void setupNonSerializableAttributes() {

    }

}

Printer Bolt

public class PrinterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple.size() + " "  + tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

}

Upvotes: 12

Views: 6056

Answers (2)

Matthias Steinbauer
Matthias Steinbauer

Reputation: 1776

Hi as soon as you are trying to subscribe to more than one keyword you will run into problems. I suggest that your spout also emits the original keyword that was used to filter.

Then instead of doing shuffleGrouping I would do a fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
            .shuffleGrouping("spout", new Fields("keyword"));

This way you make sure the results of a single keyword end up on the same bolt every time. Such that you can compute aggregates correctly. If you omit the fieldsGrouping Storm can instantiate any amount of your aggregate bolt and send any messages from the spout to any instance of the aggregate bolt which would in the end case wrong results.

Upvotes: 0

Chris Gerken
Chris Gerken

Reputation: 16392

From what I can see it looks good. The devil's in the details, though. I'm not sure what your aggregator bolt does but if it makes any assumptions about the values being passed to it then you should consider an appropriate fields grouping. This might not make that big of a difference as you're using the default parallelism hint of 1, but should you decide to scale with multiple aggregate bolt instances implicit logic assumptions you make may call for a non-shuffle grouping.

Upvotes: 4

Related Questions