Reputation: 2580
Consider the following code:
public static void main(String[] args) throws Exception {
UnicastProcessor<Integer> processor = UnicastProcessor.create();
Flux<Integer> source = Flux
.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(1))
.repeat();
source.subscribe(processor);
processor.subscribe(i -> {
System.out.println("i = " + i);
System.out.println("processor.size() = " + processor.size());
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
});
Thread.currentThread().join();
}
You can see that the delay of the source
is much smaller than the delay of the subscriber (which sleeps for 1 second).
Questions:
processor.size() = 0
. Why? I expected the queue to be filled (it's unbounded)delayElements
command, I see nothing. Why?Thanks!
Upvotes: 1
Views: 2360
Reputation: 72304
Taking a look at the docs for create()
on UnicastProcessor
, it says:
Create a new UnicastProcessor that will buffer on an internal queue in an unbounded fashion.
(Emphasis mine.)
Specifically, this means that it if its subscriber hasn't yet requested data, it will attempt to buffer any data that's available. It doesn't know that the Flux
you have there is infinite, so it sits there buffering as much of that data as it can.
If I remove delayElements command, I see nothing. Why?
With the delayElements()
call in there, that gives the millisecond needed for your subscriber to subscribe before that Flux
starts emitting data - so at that point it doesn't try to buffer any of it, just sends it straight through to the Flux. Without it, the Flux
emits values before the subscriber has subscribed, and so you get the infinite buffering sequence as above.
On the same vein, if you comment out the repeat()
call on the Flux then it will also work fine, because the processor will then just buffer those 5 values until the subscribe call is made.
Is this code might get out of memory?
In the case where the infinite buffering occurs, then yes - eventually there won't be enough memory for the processor to hold any more data.
I always see processor.size() = 0. Why?
Because the processor isn't holding any values by the time it's published it to your subscriber - it doesn't need to buffer them at that point. If you called size()
on the processor while it was attempting to buffer those values infinitely, then you'd see it constantly rise.
Upvotes: 1