lafual
lafual

Reputation: 775

Detecting an empty Flux Window before Webclient Post

With reference to my previous question Splitting a WebClient Post of a Streaming Flux into JSON Arrays , I was using;

myFlux
 .window(5)
 .flatMap(window -> client
  .post()
  .body(window, myClass.class)
  .exchange()
  .flatMap(response -> response.bodyToMono)
 )
 .subscribe();

This works fine. However, on a slow day, 5 messages make take a while to arrive and the window will not send anything until the window is full. So I switched to windowTimeout(5, Duration.ofSeconds(5)).

Now, if there is no data and the Duration is exceeded, the code is propagating an empty window which is causing an empty array to be posted.

How do I detect an empty window and not run the post?

Upvotes: 0

Views: 1797

Answers (1)

Phil Clay
Phil Clay

Reputation: 4534

Unfortunately, there is no way to know how many items will be emitted by a Flux without reading the entire Flux to completion.

Since your window size is relatively small, you could collect all the items emitted by the Flux into a List using .collectList(), and then check to see if the list is empty before sending the request.

myFlux
    .windowTimeout(5, Duration.ofSeconds(5))
    .flatMap(window ->
        // collect everything in the window into a list
        window.collectList()
             // ignore empty windows
            .filter(list -> !list.isEmpty())
             // send the request
            .flatMap(list -> client
                .post()
                .body(Flux.fromIterable(list), MyClass.class)
                .exchange()
                .flatMap(response -> response.bodyToMono(MyResponse.class))))

Upvotes: 2

Related Questions