EarthsPiligrim
EarthsPiligrim

Reputation: 61

How can i make manual resume work on Kafka Topic subscription?

Expected Behavior I am expecting the receiver to resume listening to the events from the topic work upon calling called consumer.resume(consumer.assignment());

Actual Behavior It is throwing an error You must call one of receive*() methods before using doOnConsumer"

My Code logic

public <T> void applyCircuitBreaker(CircuitBreaker circuitBreaker, KafkaReceiver<T, T> kafkaReceiver) {
        circuitBreaker.getEventPublisher().onStateTransition((event) -> {
            if (event.getStateTransition() == StateTransition.CLOSED_TO_OPEN) {
                this.pauseReceiver(kafkaReceiver);
            } else if (event.getStateTransition() == StateTransition.OPEN_TO_HALF_OPEN || event.getStateTransition() == StateTransition.HALF_OPEN_TO_CLOSED) {
                this.resumeReceiver(kafkaReceiver);
            }

        });
    }

    private <T> void pauseReceiver(KafkaReceiver<T, T> kafkaReceiver) {
        kafkaReceiver.doOnConsumer((consumer) -> {
            consumer.pause(consumer.assignment());
            return consumer;
        }).doOnSuccess((success) -> {
            log.info("Successful pause {}", success.paused());
        }).doOnError((e) -> {
            log.error("Error in pausing", e);
        }).subscribe();
    }

    private <T> void resumeReceiver(KafkaReceiver<T, T> kafkaReceiver) {
        kafkaReceiver.doOnConsumer((consumer) -> {
            consumer.resume(consumer.assignment());
            return consumer;
        }).doOnSuccess((success) -> {
            log.info("Successfully resumed kafka receiver {}", success.assignment());
        }).doOnError((e) -> {
            log.error("Error in resuming", e);
        }).subscribe();
        log.info("resumeReceiver: Kafka receiver resumed.");
    }

Looking for some suggestions incase I am missing out anything over here.

Upvotes: 0

Views: 12

Answers (0)

Related Questions