Reputation: 335
I'm doing some kafka Streams aggregation and writing aggregated records to the topic and getting the following errors. I'm using custom json serde for the aggregation helper class. The solution I found on some blogs to this problem is to increase the max.request.size.
Though I increase the max.request size to from default to 401391899, the serialized aggregation message size keeps increasing on the subsequent writes to the topic.
Running the streams after 10 mins, the below error shows up. Not sure if the problem is with my serde or should I change any config settings other than max.request.size to overcome this problem.
Message written to topic;
{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }
org.apache.kafka.common.errors.RecordTooLargeException: The message is 2292506 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:93)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
at com.google.gson.Gson.fromJson(Gson.java:795)
at com.google.gson.Gson.fromJson(Gson.java:761)
at com.google.gson.Gson.fromJson(Gson.java:710)
at com.google.gson.Gson.fromJson(Gson.java:682)
at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process
Upvotes: 0
Views: 2639
Reputation: 62350
It's a shot into the darkness, as you did not share you code, but I assume that you assemble records within your window-aggregation into a larger and larger record that you maintain as aggregation result.
Because state is backed by a Kafka topic for fault-tolerance, Streams writes a record into this topic (one record per key, with the value being the state that belong to the key). As you state (per key) grows over time, the "state records" grow over time, and eventually exceed the max-size bound.
Upvotes: 1