Reputation: 41
I have a situation to do sliding count over large scale of messages using State
and TimeService
. The sliding size is one and the window size is larger than 10 hours. The problem I meet is the checkpointing takes a lot of time. In order to improve the performance we use the incremental checkpoints. But it is still slow when the system do the checkpoint. We figure out that the most of the time is used to serialize the timers which are used to clean data. We have a timer for each key and there are about 300M timers at all.
Any suggestion to solve this problem would be appreciated. Or we can do the count in another way?
————————————————————————————————————————————
I'd like to add some details to the situation. The sliding size is one event and the window size is more than 10 hours(There are about 300 events per second), we need to react on each event. So in this situation we did not use the windows provided by Flink. we use the keyed state
to store the previous information instead. The timers
is used in ProcessFunction
to trigger the cleaning job of the old data. At last the number of the dinstinct keys is very large.
Upvotes: 4
Views: 1926
Reputation: 43454
I think this should work:
Dramatically reduce the number of keys Flink is working with from 300M down to 100K (for example), by effectively doing something like keyBy(key mod 100000). Your ProcessFunction can then use a MapState (where the keys are the original keys) to store whatever it needs.
MapStates have iterators, which you can use to periodically crawl each of these maps to expire old items. Stick to the principle of having only one timer per key (per uberkey, if you will), so that you only have 100K timers.
UPDATE:
Flink 1.6 included FLINK-9485, which allows timers to be checkpointed asynchronously, and to be stored in RocksDB. This makes it much more practical for Flink applications to have large numbers of timers.
Upvotes: 3
Reputation: 3422
Rather than registering a clearing timer on each event, how about you register a timer only once per some period e.g. once per 1 minute? You could register it only the first time a key is seen, plus refresh it in onTimer
. Sth like:
new ProcessFunction<SongEvent, Object>() {
...
@Override
public void processElement(
SongEvent songEvent,
Context context,
Collector<Object> collector) throws Exception {
Boolean isTimerRegistered = state.value();
if (isTimerRegistered != null && !isTimerRegistered) {
context.timerService().registerProcessingTimeTimer(time);
state.update(true);
}
// Standard processing
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out)
throws Exception {
pruneElements(timestamp);
if (!elements.isEmpty()) {
ctx.timerService().registerProcessingTimeTimer(time);
} else {
state.clear();
}
}
}
Something similar is implemented for Flink SQL Over
clause. You can have a look here
Upvotes: 0
Reputation: 461
What about if instead of using timers you add an extra field to every element of your stream to store the current processing time or the arrival time? So once you want to clean old data from your stream, you just have to use a filter operator and check if the data it's old engouh to be deleted.
Upvotes: 0