Reputation: 15313
I'm learning Trident framework. There are several methods on Trident Stream
s for aggregation tuples within a batch, including this one which allows to preform a stateful mapping of the tuples using Aggregator
interface. But unfortunately a built-in counterpart to additionally persist the map state, like other 9 overloadings of persistentAggregate()
, only with Aggregator
as an argument, is not present.
Thus how can I implement the desired functionality by combining lower-level Trident and Storm abstractions and tools? It is pretty hard to explore the API because there is almost no Javadoc documentation.
In other words, persistentAggregate()
methods allow to end stream processing with updating some persistent state:
stream of tuples ---> persistent state
I want to update persistent state and emit different tuples by the way:
stream of tuples ------> stream of different tuples
with
persistent state
Stream.aggregate(Fields, Aggregator, Fields)
doesn't provide fault-tolerance:
stream of tuples ------> stream of different tuples
with
simple in-memory state
Upvotes: 7
Views: 2617
Reputation: 401
You can create a new stream from a state using the method TridentState#newValuesStream(). This will allow you to retrieve a stream of the aggregated values.
For illustrative purpose, we can improve the example in Trident documentation by adding this method and a Debug Filter :
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());
Running this topology will output (to the console) the aggregated counts.
Hope it helps
Upvotes: 3