Mohammad Hossein Gerami
Mohammad Hossein Gerami

Reputation: 1388

Flink ValueState "Error while adding data to RocksDB"

When i want update value state(queueState.update(queue)) catch this exception:

org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
    at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:37)
    at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:22)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
2019-10-13 11:06:29,311 WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1570948458514 < 1570948663062
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:300)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
    ... 11 more

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.PriorityQueue;

public class CleanTimedOutPartialMatches extends KeyedProcessFunction<String, ObjectNode, ObjectNode> {

    private static Logger LOGGER = LoggerFactory.getLogger(CleanTimedOutPartialMatches.class);

    private ValueState<PriorityQueue<JsonNode>> queueState = null;

    @Override
    public void processElement(ObjectNode log, Context context, Collector<ObjectNode> collector) throws Exception {
       try {
           if (context.timestamp() > context.timerService().currentWatermark()) {
               PriorityQueue<JsonNode> queue = queueState.value();
               if (queue == null) {
                   queue = new PriorityQueue<JsonNode>(Comparator.comparingLong(o -> o.get(TS).longValue()));
               }
               queue.add(log);
               queueState.update(queue);
               context.timerService().registerEventTimeTimer(log.get(TS).longValue());
           }
       } catch (Exception e){
           e.printStackTrace();
       }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ObjectNode> out) throws Exception {
        try {
            sendToSink(queueState.value(), ctx, out);
        } catch (Exception e){
            for(StackTraceElement el : e.getStackTrace()){
                LOGGER.info("{}.{}:{}", el.getClassName(), el.getMethodName(), el.getLineNumber());
            }
        }
    }

    private void sendToSink(PriorityQueue<JsonNode> queue, OnTimerContext context, Collector<ObjectNode> out){
        long watermark = context.timerService().currentWatermark();
        JsonNode lastSentLog = null;
        JsonNode log = queue.peek();
        while (log != null && log.get(TS).longValue() <= watermark) {
            if(lastSentLog != null && extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && log.get(TS).longValue() ==  lastSentLog.get(TS).longValue()){
                LOGGER.info("duplicated log removed");
            } else {
                if(lastSentLog != null){
                    long gapTime = Math.abs(log.get(TS).longValue() - lastSentLog.get(TS).longValue()) / 1000;
                    boolean isSameAttempt = (extractLogEndpoint(lastSentLog).equals(AUTOCOMPLETE) && extractLogEndpoint(log).equals(LOG))
                            || (extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && gapTime  < MAX_TIME_GAP);
                    if(isSameAttempt){
                        ((ObjectNode)log).put(ATTEMPT_ID, lastSentLog.get(ATTEMPT_ID).textValue());
                    }
                }
                lastSentLog = log;
                out.collect((ObjectNode)log);
            }
            queue.remove(log);
            log = queue.peek();
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<PriorityQueue<JsonNode>> descriptor = new ValueStateDescriptor<>(
                // state name
                "sort-partial-matches",
                // type information of state
                TypeInformation.of(new TypeHint<PriorityQueue<JsonNode>>() {
                }));
        queueState = getRuntimeContext().getState(descriptor);
    }
}


Upvotes: 0

Views: 1167

Answers (1)

David Anderson
David Anderson

Reputation: 43697

One problem: it looks like you forgot to call queueState.update(queue) after you are done removing things from the queue.

Even if you do get this working, sorting based on a PriorityQueue with RocksDB as the state backend is going to perform very poorly, as it will have to go through ser/de of the entire queue on every access and update. It's recommended to use MapState for sorting, unless you are using one of the heap-based state backends, because this only has to do ser/de on individual entries, rather than the entire map. You can use the timestamps as keys for the MapState, and a List of objects as the values. Use timers just as you are doing now to trigger flushing out the contents of the List.

Or you could use SQL to do the sorting -- see the answer to this question for an example.

Upvotes: 3

Related Questions