Reputation: 507
In my android code, two views need some data from the database that is provided as an io.reactivex.Flowable
at the same time. One view needs the plain data, but the other one applies a mapping function that involves some IO and takes some time.
The problem is that both subscribers get the data only after the mapping function has terminated. This causes one of the views to be blank while the other one is loading.
Flowable<Integer> rxRoomSourceFlowable = Flowable.fromArray(1, 2, 3).share();
rxRoomSourceFlowable
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribeOn(Schedulers.io())
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribeOn(Schedulers.io())
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
The output:
Mapped subscriber: 1!
Plain subscriber: 1
Mapped subscriber: 2!
Plain subscriber: 2
Mapped subscriber: 3!
Plain subscriber: 3
As you can see the plain subscriber gets notified only after the long running mapping function has terminated, even though the result is completely irrelevant. The order of the calls to subscribe()
doesn't matter because the subscriptions happen asynchronously anyway.
How to ensure that the plain subscriber gets notified as soon as the data is available without waiting for the mapping function? Is there a different option to change the data inside the Flowable
than using map()
?
Upvotes: 2
Views: 208
Reputation: 69997
share()
dispatches events in a lockstep fashion, therefore, if one of the subscribers is delayed, the other one won't get items in time. You have to move the offending subscriber off to another thread and let share
progress:
Flowable<Integer> rxRoomSourceFlowable =
Flowable.fromArray(1, 2, 3)
.subscribeOn(Schedulers.io())
.share();
rxRoomSourceFlowable
.observeOn(Schedulers.io())
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
Upvotes: 2