Reputation: 133
I have a ProcessWindowFunction
for processing TumblingEventTimeWindows in which I use a state store to preserve some values across multiple tumbling windows.
My problem is that this state store is not being preserved across tumbling windows i.e. if I first store something in window [0,999] and then access this store from window [1000,1999], the store is empty.
I am aware of the global state and per window state stated here. I want to use global state. I also tried creating a minimum working example to investigate this:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class twStateStoreTest {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
final DataStream<Element> elements = env.fromElements(
Element.from(1, 500),
Element.from(1, 1000),
Element.from(1, 1500),
Element.from(1, 2000),
Element.from(99, 9999)
).
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
long w;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(w);
}
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
w = element.getTimestamp();
return w;
}
});
elements
.keyBy(new KeySelector<Element, Integer>() {
@Override
public Integer getKey(Element element) throws Exception {
return element.value;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
.process(new MyProcessWindowFn()).
print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
MapState<Integer, Integer> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
}
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
if (stateStore.get(key) == null) {
stateStore.put(key, 1);
}else {
int previous = stateStore.get(key);
stateStore.put(key, previous+1);
}
out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
+ " for window : " + context.window());
}
}
static class Element {
private final long timestamp;
private final int value;
public Element(long timestamp, int value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public int getValue() {
return value;
}
public static Element from(int value, long timestamp) {
return new Element(timestamp, value);
}
}
}
Here I'm trying to count the number of times the process()
function was called for a key. This example works and the state is indeed stored across tumbling windows. I've ensured that this example exactly mirrors the actual processWindow function, with other unnecessary code stripped off.
But state is not preserved across windows in the actual processWindowFunction!
Is there any gotcha that I am clearly missing to account for? Is there any other reason why state is not preserved across EventTimeTumblingWindows for a processWindowFunction that uses a MapState defined as follows:
private MapState<UserDefinedEnum, Boolean> activeSessionStore;
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
Here's the actual class with bloat removed and as per @David's and @ShemTov's suggestions:
public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {
private final static MapStateDescriptor<IUEventType, Boolean> desc = new MapStateDescriptor<IUEventType, Boolean>(
"store", IUEventType.class, Boolean.class);
private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
// even though I populated activeSessionStore with some values in the previous invocation of process()
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}
@Override
public void clear(Context context) throws Exception {
context.globalState().getMapState(desc).clear();
}
}
And I invoke it using :
inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
process(new IUFeatureStateCombiner())
This still has the problem, I get an empty iterable in the second invocation of process()
even though I populated the state in the previous invocation.
Edit: Problem solved, the clear() method should not be invoked since this is a global state.
Upvotes: 0
Views: 1533
Reputation: 523
I noticed that in your minimum working example
, you just created stateStore
in open
function, and used stateStore
directly in process
function. While in the so-called actual class
, you created activeSessionStore
in open
function but used context.globalState().getMapState(desc)
to get the state. In my understanding, you never used the state you created in your open
function, and always used global state. That's why when you added the clear
function, it would become empty, and after you removed clear
function it would work as you had expected. But actually you have verified in your example, you can achieve what you want without the global state like this:
...
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
...
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- Finally used the one you created in open function
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}
Upvotes: 0
Reputation: 133
My mistake was that I was using the clear()
method incorrectly. Since this is a global state, using the clear()
method will clear the state as soon as the TumblingWindow expires. As David pointed out, global state is never cleared, and we have to define a TTL for unbounded key streams.
Upvotes: 3
Reputation: 43707
You want to do something more like this. And keep in mind that these are per-key state stores -- there's a separate map for every key -- so where do you are doing stateStore.get(key)
, that doesn't really make sense. Perhaps all you need is ValueState
, if you only need to store an Integer for each key.
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
private final static MapStateDescriptor mapDesc = new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class);
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
MapState<Integer, Integer> stateStore = context.globalState.getMapState(mapDesc);
...
}
}
Note that the global state store is never cleared. So if you have an unbounded key space, you'll eventually run into problems. You can configure state TTL on the state descriptor(s) to deal with this.
Upvotes: 2
Reputation: 707
As far as i know, you can't get the global state from the @override open method.
You need to get it from the process function on the ProcessWindowFunction:
context.globalState().getMapState(<your_Map_State_Descriptor>)
Upvotes: 1