Amitoj
Amitoj

Reputation: 423

Reactor Flux conditional emit

Is it possible to allow emitting values from a Flux conditionally based on a global boolean variable? I'm working with Flux delayUntil(...) but not able to fully grasp the functionality or my assumptions are wrong.

I have a global AtomicBoolean that represents the availability of a downstream connection and only want the upstream Flux to emit if the downstream is ready to process.

To represent the scenario, created a (not working) test sample

//Randomly generates a boolean value every 5 seconds
private Flux<Boolean> signalGenerator() {
    return Flux.range(1, Integer.MAX_VALUE)
            .delayElements(Duration.ofMillis(5000))
            .map(integer -> new Random().nextBoolean());
}

and

   Flux.range(1, Integer.MAX_VALUE)
            .delayElements(Duration.ofMillis(1000))
            .delayUntil(evt -> signalGenerator())     // ??  Only proceed when signalGenerator returns true
            .subscribe(System.out::println);

I have another scenario where a downstream process can accept only x messages a second. In the current non-reactive implementation we have a Semaphore of x permits and the thread is blocked if no more permits are available, with Semaphore permits resetting every second.

In both scenarios I want upstream Flux to emit only when there is a demand from the downstream process, and I do not want to Buffer.

Upvotes: 2

Views: 1566

Answers (1)

sinacetiner
sinacetiner

Reputation: 23

You might consider using Mono.fromRunnable() as an input to delayUntil() like below;

Helper class;

public class FluxCondition {

    CountDownLatch latch = new CountDownLatch(10); // it depends, might be managed somehow
    Runnable r = () -> { latch.await(); }

    public void lock() { Mono.fromRunnable(r) };
    public void release() { latch.countDown(); }
}
    

Usage;

FluxCondition delayCondition = new FluxCondition();
Flux.range(1, 10).delayUntil(o -> delayCondition.lock()).subscribe();
.....
delayCondition.release(); // shall call this for each element

I guess there might be a better solution by using sink.emitNext but this might also require a condition variable for controlling Flux flow.

According my understanding, in reactive programming, your data should be considered in every operator step. So it might be better for you to design your consumer as a reactive processor. In my case I had no chance and followed the way as I described above

Upvotes: 1

Related Questions