Keaz
Keaz

Reputation: 985

Pause/Start Kafka Stream processors in Spring boot

I'm going to implement a Circuit breaker pattern for messages. Basic requirement is that if microservice cannot publish messages to publishing topic it should stop accepting messages from other Kafka topics. When the publishing topics become available microservice should start accepting messages from other Kafka topics.

Is there a way I can achieve this in Spring boot Kafka Streams?

Upvotes: 1

Views: 622

Answers (1)

Keaz
Keaz

Reputation: 985

I was able to achieve this by using BindingsEndpoint.

private final BindingsEndpoint binding;

@Override
public void stop() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Stopping Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::stop);
        log.info("Stopped Kafka topics ");
    }
}

@Override
public void start() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Starting Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::start);
        log.info("Started Kafka topics ");
    }
}

protected List<Binding> getBindings(List<?> objects) {
    return objects.stream().filter(object -> object instanceof Binding)
            .map(object -> (Binding) object).collect(Collectors.toList());
}

Upvotes: 1

Related Questions