Reputation: 49
At the moment I am trying to implement a standard reactive case with Project Spring Reactor: The producer is faster than the consumer. The consumer should never work with old values if new ones are already available (example: outdated stock prices are not of interest).
In my code example I have a producer that generates a new value every 100ms. But the consumer needs 500ms for processing. Since between processing in the consumer already many several new values arise, only the newest values for the consumer/subscriber would be interesting for me and not the outdated intermediate values.
Per limitRate(1) I tried to request only one value at a time to the producer and per onBackPressureLatest() I wanted to ignore intermediate values. Both did not work as desired.
What would be the correct solution?
@Test
void fluxTest(){
Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {
int i = ai.incrementAndGet();
if (i > 10) {
sink.complete();
} else {
System.out.println(Thread.currentThread()+": generate & emit value "+i);
sink.next(i);
}
sleep(100);
return ai;
});
flux
.publishOn(Schedulers.parallel())
.onBackpressureLatest()
.limitRate(1)
.subscribe(i -> {
System.out.println(Thread.currentThread()+": Receive: " + i); // do something with generated and processed item
sleep(500);
});
sleep(10000);
}
void sleep(int ms){
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Current result:
14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 2
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 3
Thread[parallel-1,5,main]: Receive: 4
Thread[parallel-1,5,main]: Receive: 5
Thread[parallel-1,5,main]: Receive: 6
Thread[parallel-1,5,main]: Receive: 7
Thread[parallel-1,5,main]: Receive: 8
Thread[parallel-1,5,main]: Receive: 9
Thread[parallel-1,5,main]: Receive: 10
Process finished with exit code 0
My expected result would be:
14:26:40.019 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[parallel-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[parallel-1,5,main]: Receive: 5
Thread[main,5,main]: generate & emit value 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[parallel-1,5,main]: Receive: 10
Process finished with exit code 0
Upvotes: -1
Views: 439
Reputation: 49
Under
Latest overflow strategy with size 1 or any alternatives
I have discovered a solution which works within the Flux Stream via Mono.just creation.
My code:
Flux<Integer> flux = Flux.generate(AtomicInteger::new, (ai, sink) -> {
int i = ai.incrementAndGet();
if (i > 10) {
sink.complete();
} else {
System.out.println(Thread.currentThread()+": generate & emit value "+i);
sink.next(i);
}
sleep(100);
return ai;
});
Disposable subscribe = flux
.publishOn(Schedulers.parallel())
.onBackpressureLatest()
.flatMap(next ->
Mono.just(next)
.subscribeOn(Schedulers.single()), 1, 1)
.subscribe(i -> {
System.out.println(Thread.currentThread() + ": Receive: " + i); // do something with generated and processed item
sleep(500);
});
while(!subscribe.isDisposed());
Now the results looks as expected:
22:51:42.624 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
Thread[main,5,main]: generate & emit value 1
Thread[single-1,5,main]: Receive: 1
Thread[main,5,main]: generate & emit value 2
Thread[main,5,main]: generate & emit value 3
Thread[main,5,main]: generate & emit value 4
Thread[main,5,main]: generate & emit value 5
Thread[main,5,main]: generate & emit value 6
Thread[single-1,5,main]: Receive: 6
Thread[main,5,main]: generate & emit value 7
Thread[main,5,main]: generate & emit value 8
Thread[main,5,main]: generate & emit value 9
Thread[main,5,main]: generate & emit value 10
Thread[single-1,5,main]: Receive: 10
Process finished with exit code 0
Upvotes: 0