BlameMe
BlameMe

Reputation: 71

Apache Flink Process Function state is not holding the state

I am writing some code for a processElement function in Apache Flink 1.4:

public class ProcessFunctionClass extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>{

    private ListState<String> listState;

    public void processElement(Tuple2<String, String> tuple2,  Context context, Collector<Tuple2<String, String>> collector) {

        // if the state is empty, start a timer
        if (listState.get().iterator().hasNext() == false)
            context.timerService().registerEventTimeTimer(10000);

        listState.add("someStringToBeStored");

        // ...
    }

}

I have this function for when the timer expires:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
    Iterable<String> strings = listState.get();
    int cnt = 0;
    int totalLength = 0;
    Iterator<String> it = strings.iterator();
    while (it.hasNext()) {
        cnt++;
        totalLength += it.next().length();
    }
    LOGGER.info("cnt is:" + cnt);
    LOGGER.info("totalLength is:" + totalLength);

    // clearing the state
    listState.clear();
}

However every time I run the application, the value of cnt is always 1, and the value of totalLength is the length of the particular string that has been processed at that time. It looks like the state is not kept in the system. From this code is it clear what I am doing wrong here?

Upvotes: 0

Views: 666

Answers (3)

Chandan Bansal
Chandan Bansal

Reputation: 49

The ProcessFunction is not meant to hold the state. You can use WindowProcessFunctions to hold the state of the elements present in the window.

Upvotes: -1

David Anderson
David Anderson

Reputation: 43707

Process functions used key-partitioned state, meaning there is a separate list for every key. My guess is that there is no key with multiple events in a 10 second period.

Upvotes: 2

kkrugler
kkrugler

Reputation: 9265

Your ProcessFunctionClass needs to extend the Flink ProcessFunction.

Upvotes: 0

Related Questions