Reputation: 6525
I am using the below api code for returning the Flux<BiteEventsResponse>
, but seems when I start my application and do the first API call from postman due to no consumer for the Flux publisher my first API call is getting stuck for infinite time. The events are getting emitted when I do the 2nd api call.
As I understood this Flux.create(eventsPublisher).share();
looks for any active consumer , as my application started and on the api call /bites/events
triggered first time so no consumer. Or can we check the Flux publisher has no previous entry or consumer ? Any suggestion on this would be helpful ?
BiteEventsController.java
public class BiteEventsController {
private final Flux<BiteEvent> biteEvent;
private final int INTERVAL = 10;
private final Flux<BiteEventsResponse> keepAlive = Flux.interval(Duration.ofSeconds(INTERVAL))
.map(e -> new BiteEventsResponse("iam-alive", null));
public BiteEventsController(BiteEventsPublisher eventsPublisher) {
this.biteEvent = Flux.create(eventsPublisher).share();
}
@GetMapping(value = "/bites/events", produces = "text/event-stream;charset=UTF-8")
public Flux<BiteEventsResponse> biteEvents(HttpServletRequest request) {
try {
Flux<BiteEventsResponse> biteEvent = this.biteEvent.filter(
event -> (event.getOwnerId() != null ))
.map(event -> {
return new BiteEventsResponse("message", (BiteResponse) event.getSource());
});
return Flux.merge(keepAlive, biteEvent);
} catch (Exception ex) {
log.error("{}", ex);
return Flux.empty();
}
}
}
BitePublisher.java
public class BiteEventsPublisher implements Consumer<FluxSink<BiteEvent>> {
@Autowired
BiteEventsPublisherConfig biteEventConfig;
@EventListener
public void onBiteEvent(BiteEvent event) {
biteEventConfig.biteEventBlockingQueue().offer(event);
}
@Override
public void accept(FluxSink<BiteEvent> sink) {
while (true)
try {
BiteEvent event = biteEventConfig.biteEventBlockingQueue().take();
sink.next(event);
} catch (Exception ex) {
log.error("Bite events publisher failed {}", ex);
break;
}
}
}
Upvotes: 0
Views: 70