Alter
Alter

Reputation: 1213

Checkpoints increasing over time in Flink

in aggregation to this question I'm still not having clear why the checkpoints of my Flink job grows and grows over time and at the moment, for about 7 days running, these checkpoints never gets the plateau. I'm using Flink 1.10 version at the moment, FS State Backend as my job cannot afford the latency costs of using RocksDB.

See the checkpoints evolve over 7 days: enter image description here Let's say that I have this configuration for the TTL of the states in all my stateful operators for one hour or maybe more than that and a day in one case:

public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot().build();

In my concern all the objects into the states will be cleaned up after the expires time and therefore the checkpoints size should be reduced, and as we expect more or less the same amount of data everyday.

In the other hand we have a traffic curve, which has more incoming data in some hours of the day, but late night the traffic goes down and all the objects into the states that expires should be cleaned up causing that the checkpoint size should be reduced not kept with the same size until the traffic goes up again.

Let's see this code sample of one use case:

DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
                    apply filters here;))
                    .name("Events filtered")
                    .keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())


public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;

@Override
    public void open(Configuration parameters) {
        /*ttlConfig described above*/
        descriptor.enableTimeToLive(ttlConfig);
        previousState = getRuntimeContext().getMapState(descriptor);
    }

@Override
    public void flatMap(Event event, Collector<Event> collector) throws Exception {
      final String key = event.rType.equals("something") ? event.id1 : event.id2;
      Event previous = previousState.get(key);
      if(previous != null){
        /*something done here*/
      }else /*something done here*/
        previousState.put(key, previous);
        collector.collect(previous);
 }
}

More or less these is the structure of the use cases, and some others that uses Windows(Time Window or Session Window)

Questions:

Kind regards!

Upvotes: 0

Views: 2121

Answers (1)

David Anderson
David Anderson

Reputation: 43499

In this stretch of code it appears that you are simply writing back the state that was already there, which only serves to reset the TTL timer. This might explain why the state isn't being expired.

Event previous = previousState.get(key);
if (previous != null) {
  /*something done here*/
} else
  previousState.put(key, previous);

It also appears that you should be using ValueState rather than MapState. ValueState effectively provides a sharded key/value store, where the keys are the keys used to partition the stream in the keyBy. MapState gives you a nested map for each key, rather than a single value. But since you are using the same key inside the flatMap that you used to key the stream originally, key-partitioned ValueState would appear to be all that you need.

Upvotes: 3

Related Questions