Reputation: 775
With reference to my previous question Splitting a WebClient Post of a Streaming Flux into JSON Arrays , I was using;
myFlux
.window(5)
.flatMap(window -> client
.post()
.body(window, myClass.class)
.exchange()
.flatMap(response -> response.bodyToMono)
)
.subscribe();
This works fine. However, on a slow day, 5 messages make take a while to arrive and the window
will not send anything until the window
is full. So
I switched to windowTimeout(5, Duration.ofSeconds(5))
.
Now, if there is no data and the Duration
is exceeded, the code is propagating an empty window
which is causing an empty array to be posted.
How do I detect an empty window
and not run the post
?
Upvotes: 0
Views: 1797
Reputation: 4534
Unfortunately, there is no way to know how many items will be emitted by a Flux without reading the entire Flux to completion.
Since your window size is relatively small, you could collect all the items emitted by the Flux into a List
using .collectList()
, and then check to see if the list is empty before sending the request.
myFlux
.windowTimeout(5, Duration.ofSeconds(5))
.flatMap(window ->
// collect everything in the window into a list
window.collectList()
// ignore empty windows
.filter(list -> !list.isEmpty())
// send the request
.flatMap(list -> client
.post()
.body(Flux.fromIterable(list), MyClass.class)
.exchange()
.flatMap(response -> response.bodyToMono(MyResponse.class))))
Upvotes: 2