Reputation: 2264
I have a Button
from which I create an Observable<OnClickEvent>
.
When the button is clicked, I wish to fetch a file from the network, but I run into issues regarding networking and threads.
This example throws android.os.NetworkOnMainThreadException
:
Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
So I try from another thread.
The following throws rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main]
:
networkButtonObservable
.subscribeOn(Schedulers.newThread())
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
Ok.. Now I try with a .debounce()
at the start :
networkButtonObservable
.debounce(10, TimeUnit.MILLISECONDS)
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
And this succeeds.
Obviously I do not like to add delays to my code, so I am trying to figure out what's going on, thread-wise. Why is the first example not also executing the code inside the .map()
in a background thread?
Or what am I missing here?
--- Update
I change my TestAPI to return an Observable, and change the first call to the networkButtonObservable to .flatMap()
. This also functions properly. But I still don't know why the original way using .map()
should fail.
networkButtonObservable
.flatMap(new Func1<OnClickEvent, Observable<?>>() {
@Override
public Observable<?> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponseObservable();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
Upvotes: 3
Views: 2643
Reputation: 69997
I'm not an expert in Android but based on the error messages, I think you need to bounce the value between the main thread and the background thread. Usually, Android examples show you to add a subscribeOn
/observeOn
pair to your stream processing:
Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});
but in these cases, the 'source' is usually a cold observable you are in control.
In your question, the source is a hot Observable
with specific requirements that you need to subscribe on the main thread, yet you need to do a network call on a background thread and then show the results on the main thread.
In this case, you can use observeOn
multiple times:
networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));
I think fetchTestResponseObservable
has its own subscribeOn
or observeOn
applied to it so it doesn't throw the network exception.
Also I'd like to mention that using multiple subscribeOn
is functionally equivalent to using only one that is closest to the emitting source, but technically it will hog unused threading resources. Using multiple observeOn
in a stream, however, has relevance because you can meaningfully 'pipeline' the stream processing between threads with them.
Upvotes: 10