Jonas Pedersen
Jonas Pedersen

Reputation: 1054

Spring Reactor - Emit last item

I have an endpoint where I call a Spring spring service to get status of an order. The service returns and Enum with the values: Processing, Completed or Error.

@GetMapping(value = ["/{orderId}/state"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun orderState(@PathVariable("orderId") orderId: Long): Flux<OrderStateDetails> {
    return Flux.interval(Duration.ofSeconds(1))
        .take(30)
        .map { orderService.orderState(orderId) }
        .takeWhile { orderStateDetails -> orderStateDetails.state == OrderState.Procession }
}

I'm using a Flux to implement a low tech polling until state is no longer Processing and no longer than 30 seconds. I wan't the endpoint to return:

But the takeWhile is not emitting the last item Completed. I tried to add .also { orderService.orderState(orderId) } and .concatWith(Flux.just(orderService.orderState(orderId))) but that will make an extra unnecessary call and also does not emit the item. How can I make it emit the last items as well?

Upvotes: 0

Views: 366

Answers (1)

kerbermeister
kerbermeister

Reputation: 4251

Instead of using takeWhile() which does not give you the !result of your predicate, you should use takeUntil()

Here's a simple example in Java, but you can easily adapt it to Kotlin and your code:

Flux.just("Processing", "Processing", "Processing", "Completed")
                .delayElements(Duration.ofMillis(1000))
                .takeUntil(status -> status.equals("Completed"))
                .subscribe(System.out::println);

This will print the following output:

Processing
Processing
Processing
Completed

So I think this is what you want to achieve.

Upvotes: 1

Related Questions