Reputation: 58
I have the following CEP PatternStream where the DataStream is partitioned based on entity ID because I am only interested in a pattern match if the entities have the same entity ID:
PatternStream<EntityMetric> patternStream = CEP.pattern(inputStream.keyBy(EntityMetric.ATTR_ENTITY_ID), thresholdPattern);
But then I noticed that the checkpoint state size increases as the number of entity IDs increases. If I understand checkpointing correctly, this is expected since the number of operator states increase. But I would like to find out if there is any other way to minimize the checkpoint state size.
Is there a different way to implement this pattern matching without partitioning the DataStream based on entity ID?
Is there other technique or configuration attribute that can help to reduce the checkpoint state size?
Thanks!
Upvotes: 1
Views: 801
Reputation: 3422
There is no one easy answer to your question. First of all what is the size of the state that you want to minimize?
It is not entirely true that the size of state increases with number of entity IDs, but it rather increases with the number of found partial matches. If there is no partial match for some IDs they won't increase the size of state.
Therefore I would advice you to stick to using KeyedStream
.
With FlinkCEP it is quite important to not leave some dangling states, which can be created e.g. by followedByAny
or zeroOrMore
. The easiest way to avoid leaving dangling states is to set the time limit for a pattern with within
this way all timeouted patterns will be pruned. Other possibility is to use deterministic contiguity like next
or followedBy
and use the until
condition on looping patterns.
Upvotes: 3