Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

Spring Kafka - create streams dynamically

I need to create kafka streams dynamically from config files, which contain source topic name and configs per each stream . App need to have tens of kafka streams and streams will be different on each environment (e.g. stage, prod). Is it possible to do that with spring-kafka library?

We can do that easy with kafka-streams:

@Bean
public List<KafkaStreams> kafkaStreams() {
    return streamRouteProperties.stream()
            .map(routeProperty -> createKafkaStream(routeProperty))
            .collect(toList());
}

private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
    Topology topology = builder.build();
    StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
    return new KafkaStreams(topology, streamsConfig);
}

and we need to implement spring SmartLifecycle interface, so all streams will be started and closed automatically.

is it possible to do the same using spring-kafka? as I see, we need to create each kafka stream in code, and I don't see a possibility how to create list of kafka streams using StreamsBuilderFactoryBean. for each required stream I need to do the following:

@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
    Consumed<String, String> consumed = ..;
    KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
    kStream.process(() -> eventProcessor);
    return kStream;   
}

@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

but how to create list of kafka streams dynamically using StreamsBuilderFactoryBean?

Upvotes: 3

Views: 2706

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

The StreamsBuilderFactoryBean just brings some opinionated, convenient API for Spring application context, but that doesn't mean that you always should be tied with that.

Luckily, the StreamsBuilderFactoryBean doesn't have too much value over regular Kafka Streams. Maximum what it does it is a lfecycle control over created internally KafkaStreams.

You are feel free just stay with the raw Kafka Streams API and don't over complicate your code with an attempt to make it your requirements based on the StreamsBuilderFactoryBean, which is really designed for one static set of options.

Upvotes: 4

Related Questions