Reputation: 3955
I need to create kafka streams dynamically from config files, which contain source topic name and configs per each stream . App need to have tens of kafka streams and streams will be different on each environment (e.g. stage, prod).
Is it possible to do that with spring-kafka
library?
We can do that easy with kafka-streams
:
@Bean
public List<KafkaStreams> kafkaStreams() {
return streamRouteProperties.stream()
.map(routeProperty -> createKafkaStream(routeProperty))
.collect(toList());
}
private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
return new KafkaStreams(topology, streamsConfig);
}
and we need to implement spring SmartLifecycle
interface, so all streams will be started and closed automatically.
is it possible to do the same using spring-kafka
?
as I see, we need to create each kafka stream in code, and I don't see a possibility how to create list of kafka streams using StreamsBuilderFactoryBean
.
for each required stream I need to do the following:
@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
Consumed<String, String> consumed = ..;
KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
kStream.process(() -> eventProcessor);
return kStream;
}
@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig);
}
but how to create list of kafka streams dynamically using StreamsBuilderFactoryBean
?
Upvotes: 3
Views: 2706
Reputation: 121560
The StreamsBuilderFactoryBean
just brings some opinionated, convenient API for Spring application context, but that doesn't mean that you always should be tied with that.
Luckily, the StreamsBuilderFactoryBean
doesn't have too much value over regular Kafka Streams. Maximum what it does it is a lfecycle control over created internally KafkaStreams
.
You are feel free just stay with the raw Kafka Streams API and don't over complicate your code with an attempt to make it your requirements based on the StreamsBuilderFactoryBean
, which is really designed for one static set of options.
Upvotes: 4