Reputation: 1388
When i want update value state(queueState.update(queue)) catch this exception:
org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:37)
at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:22)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
2019-10-13 11:06:29,311 WARN org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor - Timestamp monotony violated: 1570948458514 < 1570948663062
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:300)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
... 11 more
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.PriorityQueue;
public class CleanTimedOutPartialMatches extends KeyedProcessFunction<String, ObjectNode, ObjectNode> {
private static Logger LOGGER = LoggerFactory.getLogger(CleanTimedOutPartialMatches.class);
private ValueState<PriorityQueue<JsonNode>> queueState = null;
@Override
public void processElement(ObjectNode log, Context context, Collector<ObjectNode> collector) throws Exception {
try {
if (context.timestamp() > context.timerService().currentWatermark()) {
PriorityQueue<JsonNode> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<JsonNode>(Comparator.comparingLong(o -> o.get(TS).longValue()));
}
queue.add(log);
queueState.update(queue);
context.timerService().registerEventTimeTimer(log.get(TS).longValue());
}
} catch (Exception e){
e.printStackTrace();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ObjectNode> out) throws Exception {
try {
sendToSink(queueState.value(), ctx, out);
} catch (Exception e){
for(StackTraceElement el : e.getStackTrace()){
LOGGER.info("{}.{}:{}", el.getClassName(), el.getMethodName(), el.getLineNumber());
}
}
}
private void sendToSink(PriorityQueue<JsonNode> queue, OnTimerContext context, Collector<ObjectNode> out){
long watermark = context.timerService().currentWatermark();
JsonNode lastSentLog = null;
JsonNode log = queue.peek();
while (log != null && log.get(TS).longValue() <= watermark) {
if(lastSentLog != null && extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && log.get(TS).longValue() == lastSentLog.get(TS).longValue()){
LOGGER.info("duplicated log removed");
} else {
if(lastSentLog != null){
long gapTime = Math.abs(log.get(TS).longValue() - lastSentLog.get(TS).longValue()) / 1000;
boolean isSameAttempt = (extractLogEndpoint(lastSentLog).equals(AUTOCOMPLETE) && extractLogEndpoint(log).equals(LOG))
|| (extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && gapTime < MAX_TIME_GAP);
if(isSameAttempt){
((ObjectNode)log).put(ATTEMPT_ID, lastSentLog.get(ATTEMPT_ID).textValue());
}
}
lastSentLog = log;
out.collect((ObjectNode)log);
}
queue.remove(log);
log = queue.peek();
}
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<PriorityQueue<JsonNode>> descriptor = new ValueStateDescriptor<>(
// state name
"sort-partial-matches",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<JsonNode>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
}
Upvotes: 0
Views: 1167
Reputation: 43697
One problem: it looks like you forgot to call queueState.update(queue)
after you are done removing things from the queue.
Even if you do get this working, sorting based on a PriorityQueue with RocksDB as the state backend is going to perform very poorly, as it will have to go through ser/de of the entire queue on every access and update. It's recommended to use MapState for sorting, unless you are using one of the heap-based state backends, because this only has to do ser/de on individual entries, rather than the entire map. You can use the timestamps as keys for the MapState, and a List of objects as the values. Use timers just as you are doing now to trigger flushing out the contents of the List.
Or you could use SQL to do the sorting -- see the answer to this question for an example.
Upvotes: 3