Reputation: 2046
I have an Flux endpoint that I provide to clients (subscribers) to receive updated prices. I'm testing it accessing the URL (http://localhost:8080/prices) though the browser and it works fine. The problem I'm facing (I'm maybe missing some concepts here) is when I open this URL in many browsers and I expect to receive the notification in all of them, but just one receives. It is working as a queue instead of a topic (like in message Brokers). Is that correct behavior?
@GetMapping(value = "prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Price>>> prices() {
return Flux.interval(Duration.ofSeconds(5))
.map(sec -> pricesQueue.get())
.filter(prices -> !prices.isEmpty())
.map(prices -> ServerSentEvent.<Collection<Price>> builder()
.event("status-changed")
.data(prices)
.build());
}
Upvotes: 1
Views: 378
Reputation: 72254
get
isn't a standard queue operation, but this is almost certainly because your pricesQueue.get()
method isn't idempotent. With every request (with every browser window you open in this case), you'll get a new flux that calls pricesQueue.get()
every 5 seconds. Now if pricesQueue.get()
just retrieves the latest item in the queue and does nothing with it, all is good - all your subscribers receive the same item, and the same item is displayed. But if it acts more like a poll()
where it removes the item in the queue after it's retrieved it, then only the first flux will get that value - the rest won't, as by that point it will have been removed.
You've really two main options here:
get()
implementation (or implement a new method) so that it doesn't mutate the queue, only retrieves a value.Flux.interval(Duration.ofSeconds(5)).map(sec -> pricesQueue.get()).publish().autoConnect()
somewhere as a field (let's say as queueFlux
), then just return queueFlux.filter(prices -> !prices.isEmpty()).map(...)
in your controller method.Upvotes: 1