Reputation: 71
I am writing some code for a processElement
function in Apache Flink 1.4:
public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{
private ListState<String> listState;
public void processElement(Tuple2<String, String> tuple2, Context context, Collector<Tuple2<String, String>> collector) {
// if the state is empty, start a timer
if (listState.get().iterator().hasNext() == false)
context.timerService().registerEventTimeTimer(10000);
listState.add("someStringToBeStored");
// ...
}
}
I have this function for when the timer expires:
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
Iterable<String> strings = listState.get();
int cnt = 0;
int totalLength = 0;
Iterator<String> it = strings.iterator();
while (it.hasNext()) {
cnt++;
totalLength += it.next().length();
}
LOGGER.info("cnt is:" + cnt);
LOGGER.info("totalLength is:" + totalLength);
// clearing the state
listState.clear();
}
However every time I run the application, the value of cnt
is always 1, and the value of totalLength
is the length of the particular string that has been processed at that time. It looks like the state is not kept in the system. From this code is it clear what I am doing wrong here?
Upvotes: 0
Views: 666
Reputation: 49
The ProcessFunction is not meant to hold the state. You can use WindowProcessFunctions to hold the state of the elements present in the window.
Upvotes: -1
Reputation: 43707
Process functions used key-partitioned state, meaning there is a separate list for every key. My guess is that there is no key with multiple events in a 10 second period.
Upvotes: 2
Reputation: 9265
Your ProcessFunctionClass
needs to extend the Flink ProcessFunction
.
Upvotes: 0