doe
doe

Reputation: 991

`distinctUntilChanged` how to allow the same item once onNext is called

I have an observable which continuously emits items ,and I need to process each one (the process function takes some time). So, meantime while processing an Item, if another item emits with the same value, I can ignore it, since the same is already in progress. But once the current item is processed (and called onNext). and later if the same request comes, I should allow it. I used the distinctUntildChanged operator, but what I can see is , it will not allow if the current item is same as the last one, even if the last item completed processing and called onNext.

I have a sample to demonstrate the issue

I have a class

class User {
    String id;
    String name;

    public User(String id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public boolean equals(Object obj) {
        User obj1 = (User) obj;
        return id.equals(obj1.id);
    }

    @Override
    public String toString() {
        return name;
    }
}

And an observable (Subject)

Subject<User> mSubject = PublishSubject.create();

And my Observable chain is

 mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
    Log.d(TAG, "processing " + user);
    Thread.sleep(5000); // processing takes 5 seconds
    return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));    

And I emit values like this

    for (int i = 0; i < 20; i++) {
        Thread.sleep(1000);            
        mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
    }

The result is

emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19

All the objects are same here (equals method checks id). As you can see it took user0 first time, and will take 5 seconds to process, during this time I can ignore incoming items, but after that onNext: User 0 I should allow the same user request, but distinctUntilChanged does not allow since its hoding the last value eqal to the same user, How can I do this? Hope my question is clear.

Upvotes: 3

Views: 4380

Answers (2)

lelloman
lelloman

Reputation: 14183

So you can achieve this with a Flowable and the right BackpressureStrategy. The problem is that you are not setting the buffer size when doing observeOn. You could try this (Kotlin though):

Observable.interval(100, TimeUnit.MILLISECONDS)
    .doOnNext { println("emitting $it") }
    .toFlowable(BackpressureStrategy.LATEST)
    .observeOn(Schedulers.io(), false,1)
    .subscribeOn(Schedulers.io())
    .subscribe {
        println("consuming $it")
        Thread.sleep(500)
    }

The output would look like this:

emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14

When you call observeOn(Scheduler) the default buffer size for the back-pressure should be 128, if I'm not mistaken.

You can try by changing the buffer size in the sample above to, say, 3. You would get:

emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...

Upvotes: 2

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

You could use the groupBy() operator to separate requests by User. In each observable, you could arrange to only handle the latest emission.

mSubject
  .doOnNext(i -> Log.d(TAG, "emitted: " + i))
  .observeOn(Schedulers.io())
  .groupBy( user -> user )
  .flatMap( userObserverable -> userObservable
                                  .onBackpressureDrop()
                                  .map(user -> {
                                    Log.d(TAG, "processing " + user);
                                    Thread.sleep(5000); // processing takes 5 seconds
                                    return user;
                                  })
  .subscribe(user -> Log.d(TAG, "onNext: " + user.name));

The groupBy() operator creates one observable for each user. A new user will have a new observable created for them. Each user will be emitted on its own observable, and the onBackpressureDrop() will drop the user if downstream is not accepting emissions.

Upvotes: 0

Related Questions