Reputation: 940
I am using Flink with v1.13.2
Many of the process functions use registerProcessingTimeTimer
to clear state:
public class ProcessA ...
{
@Override
public void processElement(Object value, Context ctx, Collector<...> out) throws Exception
{
if (...)
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestampMs() + 23232);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out)
{
state.clear();
}
}
And many of the process functions use StateTtlConfig
:
public class ProcessB extends...
{
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor descriptor = ...
descriptor.enableTimeToLive(ttlConfig);
}
@Override
public void processElement(...) throws Exception
{
}
}
And I am using RocksDB for the state management.
Questions:
Upvotes: 1
Views: 794
Reputation: 43419
Where will timers created by timerService be stored? (Stored in RocksDB or task memory)
By default, in RocksDB. You also have the option to keep your timers on the heap, but unless they are few in number, this is a bad idea because checkpointing heap-based timers blocks the main stream processing thread, and they add stress to the garbage collector.
Where state time-to-live created by statettl config will be stored?
This will add a long to each item of state (in the state backend, so in RocksDB).
Is there anything saved into the memory when I use timerService or statettl?
Not if you are using RocksDB for both state and timers.
If I have millions of keys which way should I prefer?
Keep your timers in RocksDB.
Creating millions of keys can lead to out of memory exception when I use timerService? Creating millions of keys can lead to out of memory exception when I use statettl?
It is always possible to have out-of-memory exceptions with RocksDB irrespective of what you are storing in it; the native library is not always well behaved with respect to living within the memory it has been allocated. But it shouldn't grow in an unbounded way, and these choices you make about timers and state TTL shouldn't make any difference.
Improvements were made in Flink 1.14 (by upgrading to a newer version of RocksDB), but some problems are still being seen. In the worst case you might need to set your actual process memory limit in the OS to something larger than what you tell Flink it can use.
Upvotes: 1