Reputation: 577
how can I ensure that a bolt with multiple inputs, process only when all the inputs arrive?
Upvotes: 3
Views: 346
Reputation: 2639
You might want to take a look here (refer to Batching). For bolts that process more complex operations such as aggregation on multiple input tuples, you will need to extend BaseRichBolt and do your own control of the anchoring mechanism.
For this you need to declare your own output collector like this:
private OutputCollector outputCollector;
And then initialise it through your override of the prepare method:
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
Your execute method for BaseRichBolt only receives a tuple as argument, you need to be able to perform the logic to maintain the anchors and using them when emitting.
private final List<Tuple> anchors = new ArrayList<Tuple>();
@Override
public void execute(Tuple tuple) {
if (!isTupleAggregationComplete(anchors, tuple)) {
anchors.add(tuple);
return;
}
// do your computations here!
collector.emit(anchors, new Values(foo,bar,xpto));
anchors.clear();
}
You should implement isTupleAggregationComplete with the necessary logic that checks if the bolt have everything necessary information to proceed with the processing.
Upvotes: 1
Reputation: 62330
Bolt.execute()
is called for each incoming tuple, regardless what the producer was (and you cannot change this). If you want to process multiple tuples from different producers at once, you need to write custom UDF code.
LinkedList<Tuple>
as bolt member)input.getSourceComponent()
Upvotes: 1