toy
toy

Reputation: 12141

How to send a tuple to different bolt according to a value in the message

I have a Storm cluster connecting to Kinesis Stream. The message looks like this.

{
    _c: "a"
}

or it should be

{
    _c: "b"
}

I would like to send a tuple with _c="a" to one bolt and _c="b" to a different bolt. How do I achieve this?

This is the bolt that parsing the message from Kinesis to JSON Object using GSon

@Override
public void execute(Tuple tuple) {
  String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
  String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
  byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);

  ByteBuffer buffer = ByteBuffer.wrap(payload);
  String data = null;
  try {
    data = decoder.decode(buffer).toString();

    HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >> () {}.getType());

    this.outputCollector.emit(tuple, new Values(map));
    this.outputCollector.ack(tuple);

  } catch (CharacterCodingException e) {
    this.outputCollector.fail(tuple);
  }

}

Thanks

Upvotes: 3

Views: 464

Answers (1)

SebastienPattyn
SebastienPattyn

Reputation: 403

You can define two streams in your bolt and then declare two outputstreams :

@Override
public void execute(Tuple tuple) {
    // ...
    // Some Code
    // ...
    if (_c =="a") {
    collector.emit("stream1", tuple, new Values(_c));
    } else {
    collector.emit("stream2", tuple, new Values(_c));
    }

}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declareStream("stream1", new Fields("_c"));
    outputFieldsDeclarer.declareStream("stream2", new Fields("_c"));
} }

In your topology you can then use the option in ShuffleGrouping to pass a Stream_id.

topology.setBolt("FirstBolt",new FirstBolt(),1);    
topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1");
topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2");

Another possibility is to just send it to both bolts and then check the value in both bolts and execute the required code.

Upvotes: 2

Related Questions