Igor Dumchykov
Igor Dumchykov

Reputation: 397

Spring kafka re-create Kafka Stream Topology in runtime

I have an application that is based on spring boot, spring-kafka and kafka-streams. When application starts up, it creates kafka streams topology with default list of topics. What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology. Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application. Can someone suggest me how to do this in runtime?

Upvotes: 1

Views: 1609

Answers (1)

Igor Dumchykov
Igor Dumchykov

Reputation: 397

I found 1 solution. I extend StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
    return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}

public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {

    private StreamsBuilder instance;

    public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
        super(streamsConfig);
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    protected synchronized StreamsBuilder createInstance() {
        if (instance == null) {
            instance = new StreamsBuilder();
        }
        return instance;
    }

    @Override
    public synchronized void stop() {
        instance = null;
        super.stop();
    }
}

And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():

@Component

public class DynamicStream {

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

public void init() {
    StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
        //build topology
}

//call this method when stream reconfiguration is needed
public void reinitialize() {
    streamsBuilderFactoryBean.stop();
    init();
    streamsBuilderFactoryBean.start();
}

}

Upvotes: 1

Related Questions