Niubility
Niubility

Reputation: 577

When I use storm, how can I ensure that a bolt with multiple inputs, process only when all the inputs arrive?

The topology looks like this. The Topology :

how can I ensure that a bolt with multiple inputs, process only when all the inputs arrive?

Upvotes: 3

Views: 346

Answers (2)

rph
rph

Reputation: 2639

You might want to take a look here (refer to Batching). For bolts that process more complex operations such as aggregation on multiple input tuples, you will need to extend BaseRichBolt and do your own control of the anchoring mechanism.

For this you need to declare your own output collector like this:

private OutputCollector outputCollector;

And then initialise it through your override of the prepare method:

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    this.outputCollector = outputCollector;
}

Your execute method for BaseRichBolt only receives a tuple as argument, you need to be able to perform the logic to maintain the anchors and using them when emitting.

private final List<Tuple> anchors = new ArrayList<Tuple>();

@Override
public void execute(Tuple tuple) {
    if (!isTupleAggregationComplete(anchors, tuple)) {
        anchors.add(tuple);
        return;
    }

    // do your computations here!

    collector.emit(anchors, new Values(foo,bar,xpto));

    anchors.clear();
}

You should implement isTupleAggregationComplete with the necessary logic that checks if the bolt have everything necessary information to proceed with the processing.

Upvotes: 1

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

Bolt.execute() is called for each incoming tuple, regardless what the producer was (and you cannot change this). If you want to process multiple tuples from different producers at once, you need to write custom UDF code.

  1. You need an input buffer for each producer, that can buffer incoming tuples (maybe a LinkedList<Tuple> as bolt member)
  2. For each incoming tuple, you add the tuple to the corresponding buffer (you can access the producer information in the tuple's meta data, via. input.getSourceComponent()
  3. After adding the tuple to the buffer, you check, if each buffer contains at least one tuple: if yes, take one tuple from each buffer an process them (after processing, check the buffers again until at least once buffer is empty) -- of no, just return and do not process anything.

Upvotes: 1

Related Questions