JDGuide
JDGuide

Reputation: 6525

How to add consumer to Flux publisher on first api call

I am using the below api code for returning the Flux<BiteEventsResponse> , but seems when I start my application and do the first API call from postman due to no consumer for the Flux publisher my first API call is getting stuck for infinite time. The events are getting emitted when I do the 2nd api call.

As I understood this Flux.create(eventsPublisher).share(); looks for any active consumer , as my application started and on the api call /bites/events triggered first time so no consumer. Or can we check the Flux publisher has no previous entry or consumer ? Any suggestion on this would be helpful ?

BiteEventsController.java

public class BiteEventsController {

    private final Flux<BiteEvent> biteEvent;
    private final int INTERVAL = 10;
    private final Flux<BiteEventsResponse> keepAlive = Flux.interval(Duration.ofSeconds(INTERVAL))
            .map(e -> new BiteEventsResponse("iam-alive", null));

    public BiteEventsController(BiteEventsPublisher eventsPublisher) {
        this.biteEvent = Flux.create(eventsPublisher).share();
    }

    @GetMapping(value = "/bites/events", produces = "text/event-stream;charset=UTF-8")
    public Flux<BiteEventsResponse> biteEvents(HttpServletRequest request) {        
        try {
            Flux<BiteEventsResponse> biteEvent = this.biteEvent.filter(
                    event -> (event.getOwnerId() != null ))
                    .map(event -> {                        
                        return new BiteEventsResponse("message", (BiteResponse) event.getSource());
                    });
            return Flux.merge(keepAlive, biteEvent);
        } catch (Exception ex) {
            log.error("{}", ex);
            return Flux.empty();
        }
    }

}

BitePublisher.java

public class BiteEventsPublisher implements Consumer<FluxSink<BiteEvent>> {

@Autowired
BiteEventsPublisherConfig biteEventConfig;

@EventListener
public void onBiteEvent(BiteEvent event) {
    biteEventConfig.biteEventBlockingQueue().offer(event);
}

@Override
public void accept(FluxSink<BiteEvent> sink) {
    while (true)
        try {
            BiteEvent event = biteEventConfig.biteEventBlockingQueue().take();
            sink.next(event);
        } catch (Exception ex) {
            log.error("Bite events publisher failed {}", ex);
            break;
        }
}

}

Upvotes: 0

Views: 70

Answers (0)

Related Questions