Reputation: 21
We are using Kafka streams state store in the project, and we want to store more than 1MB of data, but we got below exception:
The message is 1760923 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
Then I followed the link Add prefix to StreamsConfig to enable setting default internal topic configs and added the following config:
topic.max.request.size=50000000
Then application works fine, and it can works fine when state store internal topic had been created but when Kafka been restarted and the state store topic had been lost/delete, then the Kafka stream processor need to create the internal state store topic automatically when start the application, at that moment, it throw exception which says:
"Aorg.apache.kafka.streams.errors.StreamsException: Could not create topic data-msg-seq-state-store-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)....
.....
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: max.request.size".
The solution is we can manually create the internal topic, but that should not be good one.
Can you help me on this issue? If there is any config I have missed?
Thanks very much.
17 June 2020 update: still not resolve the issue. anyone can help?
Upvotes: 2
Views: 4949
Reputation: 81
The solution that you are looking for lies in the Kafka Stream's configuration properties that you set before starting the stream.
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");
The value I used here is 5 MB in bytes. You can change the value to suit your needs.
Upvotes: 6
Reputation: 9357
I don't see a configuration with max.request.size
. May be it is max.message.bytes (Topic configuration reference). So, you may try setting this.
You can refer to the broker setting max.message.bytes and increase it. It sets it at the broker level.
Documentation states:
The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config.
Default: 1048588 (~1Mb) (Confluent Kafka)
Also refer to the following Stackoverflow answer
Upvotes: 0