Reputation: 15752
I have a producer which emits items periodically and a consumer which is sometimes quite slow. It is important that the consumer only works with recent items. I thought onBackpressureLatest() is the perfect solution for this problem. So I wrote the following test code:
PublishProcessor<Integer> source = PublishProcessor.create();
source
.onBackpressureLatest()
.observeOn(Schedulers.from(Executors.newCachedThreadPool()))
.subscribe(i -> {
System.out.println("Consume: " + i);
Thread.sleep(100);
});
for (int i = 0; i < 10; i++) {
System.out.println("Produce: " + i);
source.onNext(i);
}
I expected it to log something like:
Produce: 0
...
Produce: 9
Consume: 0
Consume: 9
Instead, I get
Produce: 0
...
Produce: 9
Consume: 0
Consume: 1
...
Consume: 9
onBackpressureLatest() and onBackpressureDrop() do both not have any effect. Only onBackpressureBuffer(i) causes an exception.
I use rxjava 2.1.9. Any ideas what the problem/my misunderstanding could be?
Upvotes: 3
Views: 1243
Reputation: 81588
I think the following is supposed to work but I'm not entirely sure
PublishProcessor<Integer> source = PublishProcessor.create();
source
.onBackpressureLatest()
.switchMap(item -> Flowable.just(item)) // <--
.observeOn(
Schedulers.from(Executors.newCachedThreadPool()))
.subscribe(i -> {
System.out.println("Consume: " + i);
Thread.sleep(100);
});
Upvotes: 0
Reputation: 70017
observeOn
has an internal buffer (default 128 elements) that will pick up all source items easily immediately, thus the onBackpressureLatest
is always fully consumed.
Edit:
The smallest buffer you can create is 1 which should provide the required pattern:
source.onBackpressureLatest()
.observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
.subscribe(v -> { /* ... */ });
(the earlier delay
+ rebatchRequest
combination is practically equivalent to this).
Upvotes: 6