Reputation: 7633
I am not sure which stream Flink transformation I have to use to compute the average of some stream and update a state (let's say it is an array of ints my state) over a window of 5 seconds.
If I use RichFlatMapFunction
I can compute the average and update my array state. However, I have to call
streamSource
.keyBy(0)
.flatMap(new MyRichFlatMapFunction())
.print()
and I cannot write it on a window. If I use
streamSource
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new MyAggregateFunction())
.print()
I cannot keep my array state through ValueState
.
I was trying to use RichAggregateFunction
and I got with the same problem of this thread. Flink error on using RichAggregateFunction Is there another way to compute the average and keep track of another state in Flink?
How would I approach this problem in Flink? Here is the way that I am trying to do but actually does not work > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L70
streamStations.filter(new SensorFilter("COUNT_TR"))
.map(new TrainStationMapper())
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
// THIS AGGREGATE DOES NOT WORK
// .aggregate(new AverageRichAggregator())
// .print();
public static class AverageRichAggregator extends
RichAggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {
private static final long serialVersionUID = -40874489412082797L;
private String functionName;
private ValueState<CountMinSketch> countMinSketchState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<CountMinSketch> descriptor = new ValueStateDescriptor<>("countMinSketchState",
CountMinSketch.class);
this.countMinSketchState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple3<Double, Long, Integer> createAccumulator() {
this.countMinSketchState.clear();
return new Tuple3<>(0.0, 0L, 0);
}
@Override
public Tuple3<Double, Long, Integer> add(
Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double> value,
Tuple3<Double, Long, Integer> accumulator) {
try {
if (value.f1.f1.equals("COUNT_PE")) {
// int count = (int) Math.round(value.f2);
// countMinSketch.updateSketchAsync("COUNT_PE");
} else if (value.f1.f1.equals("COUNT_TI")) {
// int count = (int) Math.round(value.f2);
// countMinSketch.updateSketchAsync("COUNT_TI");
} else if (value.f1.f1.equals("COUNT_TR")) {
// int count = (int) Math.round(value.f2);
// countMinSketch.updateSketchAsync("COUNT_TR");
}
CountMinSketch currentCountMinSketchState = this.countMinSketchState.value();
currentCountMinSketchState.updateSketchAsync(value.f1.f1);
this.countMinSketchState.update(currentCountMinSketchState);
} catch (IOException e) {
e.printStackTrace();
}
return new Tuple3<>(accumulator.f0 + value.f2, accumulator.f1 + 1L, value.f1.f4);
}
@Override
public Tuple2<String, Double> getResult(Tuple3<Double, Long, Integer> accumulator) {
String label = "";
int frequency = 0;
try {
if (functionName.equals("COUNT_PE")) {
label = "PEOPLE average on train station";
// frequency = countMinSketch.getFrequencyFromSketch("COUNT_PE");
} else if (functionName.equals("COUNT_TI")) {
label = "TICKETS average on train station";
// frequency = countMinSketch.getFrequencyFromSketch("COUNT_TI");
} else if (functionName.equals("COUNT_TR")) {
label = "TRAIN average on train station";
// frequency = countMinSketch.getFrequencyFromSketch("COUNT_TR");
}
frequency = this.countMinSketchState.value().getFrequencyFromSketch(functionName);
} catch (IOException e) {
e.printStackTrace();
}
return new Tuple2<>(label + "[" + accumulator.f2 + "] reads[" + frequency + "]",
((double) accumulator.f0) / accumulator.f1);
}
@Override
public Tuple3<Double, Long, Integer> merge(Tuple3<Double, Long, Integer> a, Tuple3<Double, Long, Integer> b) {
return new Tuple3<>(a.f0 + b.f0, a.f1 + b.f1, a.f2);
}
}
error:
Exception in thread "main" java.lang.UnsupportedOperationException: This aggregation function cannot be a RichFunction.
at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:692)
at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.<init>(MultiSensorMultiStationsReadingMqtt2.java:71)
at org.sense.flink.App.main(App.java:141)
Thanks
Upvotes: 2
Views: 1632
Reputation: 43707
Aggregators aren't allowed to keep arbitrary state, in case the aggregator might be used with a merging window -- since Flink wouldn't know how to merge your adhoc state.
But you can combine an AggregateFunction with a ProcessWindowFunction, like this:
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());
The process method of the ProcessWindowFunction will be passed an iterator that contains only the pre-aggregated result, and a Context that provides access to both global and per-window state. Hopefully that will provide what you need in a straightforward way. But if you need to update your own state with each arriving record, then you'll need to extend the types being managed by the aggregator to accommodate this.
Here's a rough outline of how you might use the global state:
private static class MyWindowFunction extends ProcessWindowFunction<IN, OUT, KEY, TimeWindow> {
private final static ValueStateDescriptor<Long> myGlobalState =
new ValueStateDescriptor<>("stuff", LongSerializer.INSTANCE);
@Override
public void process(KEY key, Context context, Iterable<IN> values, Collector<OUT> out) {
ValueState<Long> goodStuff = context.globalState().getState(myGlobalState);
}
}
Upvotes: 3