Reputation: 371
Quarkus' Kafka-Streams extension provides a convenient way to start a pipeline. Necessary configuration options for the stream application, e.g., quarkus.kafka-streams.bootstrap-servers=localhost:9092
must be inserted in the application.properties
file for the encompassing project.
Quarkus also provides a pass-through option for a finer configuration. The documentation states:
All the properties within the kafka-streams namespace are passed through as-is to the Kafka Streams engine. Changing their values requires a rebuild of the application.
With that, we can for example pass-through a custom timestamp extractor (or any other configuration property that related to the streams configuration)
...
kafka-streams.default.timestamp.extractor=my.custom.extractor.class
...
On startup, the Quarkus extension prints out that this config property is not known, however we can see that it was correctly passed-through since my.custom.extractor.class
shows up in the automatically printed configuration of the stream application.
How can we pass-through options to the underlying Kafka producer and consumer and validate their usage? For example, I want to change the underlying producer's max.request.size
and the consumer's max.partition.fetch.bytes
properties.
The native Kafka Streams and the old Quarkus approach allow me to directly change the passed configuration object in code, e.g.,
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 6000000);
IMO this is dirty in Quarkusland and I would like to pass through all options via the application.properties
file.
Upvotes: 1
Views: 1131
Reputation: 4438
You can just pass it using standard configuration for consumers and producers, prefixed with kafka-streams:
kafka-streams.consumer.$property
kafka-streams.producer.$property
Check -> https://quarkus.io/guides/kafka-streams
Upvotes: 1