user1579557
user1579557

Reputation: 393

send output of two bolts to a single bolt in Storm?

What is the easiest way to send output of BoltA and BoltB to BoltC. Do I have to use Joins or is there any simpler solution. A and B have same fields (ts, metric_name, metric_count).

    // KafkaSpout --> LogDecoder
    builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);

    // LogDecoder --> CountBolt
    builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);

    // LogDecoder --> HttpResCodeCountBolt
    builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);


    # And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.

    // CountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));

    // HttpResCodeCountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));

Is this possible ?

Upvotes: 5

Views: 5217

Answers (2)

Chris Gerken
Chris Gerken

Reputation: 16392

Yes. Just add a stream-id ("stream1" and "stream2" below) to the fieldsGrouping call:

BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5); 
bd.fieldsGrouping((COUNT_BOLT_ID), "stream1",  new Fields("ts"));
bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));

and then in the execute() method for BoltC you can test to see which stream the tuple came from:

public void execute(Tuple tuple) {

    if ("stream1".equals(tuple.getSourceStreamId())) {
        // this came from stream1
    } else if ("stream2".equals(tuple.getSourceStreamId())) {
        // this came from stream2
    }

Since you know which stream the tuple came from, you don't need to have the same shape of tuple on the two streams. You just de-marshall the tuple according to the stream-id.

You can also check to see which component the tuple came from (as I type this I think this might be more appropriate to your case) as well as the instance of the component (the task) that emitted the tuple.

Upvotes: 6

zenbeni
zenbeni

Reputation: 7193

As @Chris said you can use streams. But you can also simply get the source component from the tuple.

@Override
public final void execute(final Tuple tuple) {
    final String sourceComponent = tuple.getSourceComponent();
    ....
}

The source component is the name you gave to the Bolt at the topology's initialization. For instance: COUNT_BOLT_ID.

Upvotes: 2

Related Questions