Reputation: 61
Expected Behavior I am expecting the receiver to resume listening to the events from the topic work upon calling called consumer.resume(consumer.assignment());
Actual Behavior It is throwing an error You must call one of receive*() methods before using doOnConsumer"
My Code logic
public <T> void applyCircuitBreaker(CircuitBreaker circuitBreaker, KafkaReceiver<T, T> kafkaReceiver) {
circuitBreaker.getEventPublisher().onStateTransition((event) -> {
if (event.getStateTransition() == StateTransition.CLOSED_TO_OPEN) {
this.pauseReceiver(kafkaReceiver);
} else if (event.getStateTransition() == StateTransition.OPEN_TO_HALF_OPEN || event.getStateTransition() == StateTransition.HALF_OPEN_TO_CLOSED) {
this.resumeReceiver(kafkaReceiver);
}
});
}
private <T> void pauseReceiver(KafkaReceiver<T, T> kafkaReceiver) {
kafkaReceiver.doOnConsumer((consumer) -> {
consumer.pause(consumer.assignment());
return consumer;
}).doOnSuccess((success) -> {
log.info("Successful pause {}", success.paused());
}).doOnError((e) -> {
log.error("Error in pausing", e);
}).subscribe();
}
private <T> void resumeReceiver(KafkaReceiver<T, T> kafkaReceiver) {
kafkaReceiver.doOnConsumer((consumer) -> {
consumer.resume(consumer.assignment());
return consumer;
}).doOnSuccess((success) -> {
log.info("Successfully resumed kafka receiver {}", success.assignment());
}).doOnError((e) -> {
log.error("Error in resuming", e);
}).subscribe();
log.info("resumeReceiver: Kafka receiver resumed.");
}
Looking for some suggestions incase I am missing out anything over here.
Upvotes: 0
Views: 12