Joel
Joel

Reputation: 15752

Only consume latest item with onBackpressureLatest()

I have a producer which emits items periodically and a consumer which is sometimes quite slow. It is important that the consumer only works with recent items. I thought onBackpressureLatest() is the perfect solution for this problem. So I wrote the following test code:

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .onBackpressureLatest()
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()))
        .subscribe(i -> {
            System.out.println("Consume: " + i);
            Thread.sleep(100);
        });

for (int i = 0; i < 10; i++) {
    System.out.println("Produce: " + i);
    source.onNext(i);
}

I expected it to log something like:

Produce: 0
...
Produce: 9
Consume: 0
Consume: 9

Instead, I get

Produce: 0
...
Produce: 9
Consume: 0
Consume: 1
...
Consume: 9

onBackpressureLatest() and onBackpressureDrop() do both not have any effect. Only onBackpressureBuffer(i) causes an exception.

I use rxjava 2.1.9. Any ideas what the problem/my misunderstanding could be?

Upvotes: 3

Views: 1243

Answers (2)

EpicPandaForce
EpicPandaForce

Reputation: 81588

I think the following is supposed to work but I'm not entirely sure

 PublishProcessor<Integer> source = PublishProcessor.create();
 source
    .onBackpressureLatest()
    .switchMap(item -> Flowable.just(item)) // <--
    .observeOn(
         Schedulers.from(Executors.newCachedThreadPool()))
    .subscribe(i -> {
        System.out.println("Consume: " + i);
        Thread.sleep(100);
    });

Upvotes: 0

akarnokd
akarnokd

Reputation: 70017

observeOn has an internal buffer (default 128 elements) that will pick up all source items easily immediately, thus the onBackpressureLatest is always fully consumed.

Edit:

The smallest buffer you can create is 1 which should provide the required pattern:

source.onBackpressureLatest()
      .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
      .subscribe(v -> { /* ... */ });

(the earlier delay + rebatchRequest combination is practically equivalent to this).

Upvotes: 6

Related Questions