Spring Webflux endpoint working as a topic

I have an Flux endpoint that I provide to clients (subscribers) to receive updated prices. I'm testing it accessing the URL (http://localhost:8080/prices) though the browser and it works fine. The problem I'm facing (I'm maybe missing some concepts here) is when I open this URL in many browsers and I expect to receive the notification in all of them, but just one receives. It is working as a queue instead of a topic (like in message Brokers). Is that correct behavior?

@GetMapping(value = "prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Price>>> prices() {
return Flux.interval(Duration.ofSeconds(5))
        .map(sec -> pricesQueue.get())
        .filter(prices -> !prices.isEmpty())
        .map(prices -> ServerSentEvent.<Collection<Price>> builder()
                .event("status-changed")
                .data(prices)
                .build());
}

Upvotes: 1

Views: 378

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

get isn't a standard queue operation, but this is almost certainly because your pricesQueue.get() method isn't idempotent. With every request (with every browser window you open in this case), you'll get a new flux that calls pricesQueue.get() every 5 seconds. Now if pricesQueue.get() just retrieves the latest item in the queue and does nothing with it, all is good - all your subscribers receive the same item, and the same item is displayed. But if it acts more like a poll() where it removes the item in the queue after it's retrieved it, then only the first flux will get that value - the rest won't, as by that point it will have been removed.

You've really two main options here:

  • Change your get() implementation (or implement a new method) so that it doesn't mutate the queue, only retrieves a value.
  • Turn the flux into a hot flux. Store Flux.interval(Duration.ofSeconds(5)).map(sec -> pricesQueue.get()).publish().autoConnect() somewhere as a field (let's say as queueFlux), then just return queueFlux.filter(prices -> !prices.isEmpty()).map(...) in your controller method.

Upvotes: 1

Related Questions