xorgate
xorgate

Reputation: 2264

RxAndroid ViewObservable NetworkOnMainThreadException

I have a Button from which I create an Observable<OnClickEvent>.

When the button is clicked, I wish to fetch a file from the network, but I run into issues regarding networking and threads.

This example throws android.os.NetworkOnMainThreadException :

Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

So I try from another thread.

The following throws rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main] :

networkButtonObservable
    .subscribeOn(Schedulers.newThread())
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

Ok.. Now I try with a .debounce() at the start :

networkButtonObservable
    .debounce(10, TimeUnit.MILLISECONDS)
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

And this succeeds.

Obviously I do not like to add delays to my code, so I am trying to figure out what's going on, thread-wise. Why is the first example not also executing the code inside the .map() in a background thread?

Or what am I missing here?

--- Update

I change my TestAPI to return an Observable, and change the first call to the networkButtonObservable to .flatMap(). This also functions properly. But I still don't know why the original way using .map() should fail.

networkButtonObservable
    .flatMap(new Func1<OnClickEvent, Observable<?>>() {
        @Override
        public Observable<?> call(OnClickEvent onClickEvent) {
            return TestAPI.getTestService().fetchTestResponseObservable();
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

Upvotes: 3

Views: 2643

Answers (1)

akarnokd
akarnokd

Reputation: 69997

I'm not an expert in Android but based on the error messages, I think you need to bounce the value between the main thread and the background thread. Usually, Android examples show you to add a subscribeOn/observeOn pair to your stream processing:

Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});

but in these cases, the 'source' is usually a cold observable you are in control.

In your question, the source is a hot Observable with specific requirements that you need to subscribe on the main thread, yet you need to do a network call on a background thread and then show the results on the main thread.

In this case, you can use observeOn multiple times:

networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));

I think fetchTestResponseObservable has its own subscribeOn or observeOn applied to it so it doesn't throw the network exception.

Also I'd like to mention that using multiple subscribeOn is functionally equivalent to using only one that is closest to the emitting source, but technically it will hog unused threading resources. Using multiple observeOn in a stream, however, has relevance because you can meaningfully 'pipeline' the stream processing between threads with them.

Upvotes: 10

Related Questions