Reputation: 277
I am getting this error when executing kstreams to get the aggregated count.
Exception in thread "KStreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
Here is the code I am executing
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));
KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));
It used to run fine before I added this suppress code.Any idea?
Upvotes: 4
Views: 6114
Reputation: 6593
I think it is not an issue with serdes for count()
.
If you don't pass Materialized
, serdes from object on which you called count()
are used.
That chain of looking serdes is going till method, where you passed your last serdes. In your case it is .groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()))
.
Serdes are not an problem, because count()
and suppress(...)
will use for key Serdes.Integer()
and for value Serdes.Long())
.
I've tried to reproduce you exception and I was able to do that, only when I've changed the type of keys in messages and Serdes
,
that were processed by suppress
function (grouping key type) and restarted the application. Exception is thrown when KafkaStreams tries to flush data during the commit.
How I reproduced it:
First produce several messages by producer and run following code. Type of key is important (Long)
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Long, Long> events = stream.map((k, v) -> new KeyValue<Long, Long>((long) v.getPageId(), v.getUserId()));
KGroupedStream<Long, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Long(), Serdes.Long()));
KTable<Windowed<Long>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().longValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Long(),Serdes.Long()));
After 1-2 minutes, stop the application and revert modification to your original code: Type of key is important (Integer)
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));
KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));
Produce few messages, wait 10 minutes (depends on your Window), produce few more messages and wait till commit will be performed (30 seconds) - your Exception will be thrown.
What goes wrong?
The problem is, that in suppress(...)
the keys of old messages were serialized using old serdes.
suppress(...)
operation is preformed by KTableSuppressProcessor
.
It has internal buffer, that stores messages before forwarding them (when they expired) to the next ProcessorNode.
Suppress
needs timestamp, so its buffer as the key of the message has composition of timestamp and array of bytes (after serialization of business key with business Serdes), value of message is only an array of bytes (after serialization of of business value).
Summarizing: buffer internally doesn't care about business message types. Internal buffer is materialized in SUPPRESS changelog.
If message is forwarded to next ProcessorNode, KTableSuppressProcessor
:
The question is why exception wasn't thrown at startup, but after some while?
In first of the above code snippet, Long is used as key in grouping. When messages are passed to suppress
,
suppress
will serialized key as array of bytes and use timestamp with that array of byte as key for its internal buffer.
When application is stop, internal buffer is materialized into SUPPRESS changelog topic.
If we change grouping key type to Integer (second code snippet) and start application, based on SUPPRESS changelog topic, internal buffer will be restore. During restoring only timestamp is extracted from raw key. Array of bytes, that represents business part is not touched.
When new messages will be passed to the suppress
they will be handle like previously (key will be serialized to array of bytes and with timestamp will be used as internal buffer key).
After processing each message KTableSuppressProcessor
checks if timestamp of any buffered messages expired and if it happens try to forward it to next ProcessorNode.
In our example, as keys in internal buffer we have timestamp (long) and array of bytes, that represents business key (ex. 8 bytes for Long, and 4 bytes for Integer).
So just before forwarding KTableSuppressProcessor
will try to deserialize those arrays (they have different length) using IntegerDeserializer
.
Array of bytes representing Long will be too long and IntegerDeserializer will throw an exception.
That operation happens not at start up of the application, but rather when commit is performed.
The other question might be: Why exception is not thrown if we run both version of program without: suppress
.
KStreamWindowAggregate
(is responsible for aggregation) only passes aggregated message, when its value was modified. Because we change the Serdes, we won't modified old aggregation (key will serialize to different array of bytes), but put new one.
On the other hands KTableSuppressProcessor
passes all expired messages, even these, that were buffered with older Serdes.
Upvotes: 6