Reputation: 991
I have an observable which continuously emits items ,and I need to process each one (the process function takes some time). So, meantime while processing an Item, if another item emits with the same value,
I can ignore it, since the same is already in progress. But once the current item is processed (and called onNext). and later if the same request comes, I should allow it.
I used the distinctUntildChanged
operator, but what I can see is , it will not allow if the current item is same as the last one, even if the last item completed processing and called onNext
.
I have a sample to demonstrate the issue
I have a class
class User {
String id;
String name;
public User(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public boolean equals(Object obj) {
User obj1 = (User) obj;
return id.equals(obj1.id);
}
@Override
public String toString() {
return name;
}
}
And an observable (Subject)
Subject<User> mSubject = PublishSubject.create();
And my Observable chain is
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));
And I emit values like this
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
}
The result is
emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19
All the objects are same here (equals method checks id). As you can see it took user0
first time, and will take 5 seconds to process, during this time I can ignore incoming items,
but after that onNext: User 0
I should allow the same user request, but distinctUntilChanged
does not allow since its hoding the last value eqal to the same user, How can I do this?
Hope my question is clear.
Upvotes: 3
Views: 4380
Reputation: 14183
So you can achieve this with a Flowable
and the right BackpressureStrategy
. The problem is that you are not setting the buffer size when doing observeOn
. You could try this (Kotlin though):
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { println("emitting $it") }
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.io(), false,1)
.subscribeOn(Schedulers.io())
.subscribe {
println("consuming $it")
Thread.sleep(500)
}
The output would look like this:
emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14
When you call observeOn(Scheduler)
the default buffer size for the back-pressure should be 128, if I'm not mistaken.
You can try by changing the buffer size in the sample above to, say, 3. You would get:
emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...
Upvotes: 2
Reputation: 8227
You could use the groupBy()
operator to separate requests by User
. In each observable, you could arrange to only handle the latest emission.
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.groupBy( user -> user )
.flatMap( userObserverable -> userObservable
.onBackpressureDrop()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
})
.subscribe(user -> Log.d(TAG, "onNext: " + user.name));
The groupBy()
operator creates one observable for each user. A new user will have a new observable created for them. Each user will be emitted on its own observable, and the onBackpressureDrop()
will drop the user if downstream is not accepting emissions.
Upvotes: 0