SvenR
SvenR

Reputation: 49

reactor stream - overwrite old value, keep only latest

At the moment I am trying to implement a standard reactive case with Project Spring Reactor: The producer is faster than the consumer. The consumer should never work with old values if new ones are already available (example: outdated stock prices are not of interest).

In my code example I have a producer that generates a new value every 100ms. But the consumer needs 500ms for processing. Since between processing in the consumer already many several new values arise, only the newest values for the consumer/subscriber would be interesting for me and not the outdated intermediate values.

Per limitRate(1) I tried to request only one value at a time to the producer and per onBackPressureLatest() I wanted to ignore intermediate values. Both did not work as desired.

What would be the correct solution?

@Test
void fluxTest(){
    
    Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {

        int i = ai.incrementAndGet();

        if (i > 10) {
            sink.complete();
        } else {
            System.out.println(Thread.currentThread()+": generate & emit value "+i);
            sink.next(i);
        }
        sleep(100);
        return ai;
    });

    flux
            .publishOn(Schedulers.parallel())
            .onBackpressureLatest()
            .limitRate(1)
            .subscribe(i -> {
                System.out.println(Thread.currentThread()+": Receive: " + i); // do something with generated and processed item
                sleep(500);
            });

    sleep(10000);
}

void sleep(int ms){
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Current result:

14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 2
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 3
Thread[parallel-1,5,main]: Receive: 4
Thread[parallel-1,5,main]: Receive: 5
Thread[parallel-1,5,main]: Receive: 6
Thread[parallel-1,5,main]: Receive: 7
Thread[parallel-1,5,main]: Receive: 8
Thread[parallel-1,5,main]: Receive: 9
Thread[parallel-1,5,main]: Receive: 10

Process finished with exit code 0

My expected result would be:

14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 5
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 10

Process finished with exit code 0

Upvotes: -1

Views: 439

Answers (1)

SvenR
SvenR

Reputation: 49

Under

Latest overflow strategy with size 1 or any alternatives

I have discovered a solution which works within the Flux Stream via Mono.just creation.

My code:

Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {

    int i = ai.incrementAndGet();

    if (i > 10) {
        sink.complete();
    } else {
        System.out.println(Thread.currentThread()+": generate & emit value "+i);
        sink.next(i);
    }
    sleep(100);
    return ai;
});

Disposable subscribe = flux
        .publishOn(Schedulers.parallel())
        .onBackpressureLatest()
        .flatMap(next ->
                Mono.just(next)
                        .subscribeOn(Schedulers.single()), 1, 1)
        .subscribe(i -> {
            System.out.println(Thread.currentThread() + ": Receive: " + i); // do something with generated and processed item
            sleep(500);
        });

while(!subscribe.isDisposed());

Now the results looks as expected:

22:51:42.624 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[single-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[main,5,main]: generate & emit value 6
Thread[single-1,5,main]: Receive: 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[single-1,5,main]: Receive: 10

Process finished with exit code 0

Upvotes: 0

Related Questions