alexpfx
alexpfx

Reputation: 6700

NetworkOnMainThreadException when call connect from ConnectableFlowable

I'm having problems dealing with ConnectableFlowable and Threads:

My retrofit service returns a ConnectableFlowable:

service.searchBeers(config.getKey(), name)
                        .map(Mappers.SEARCH_MAPPER).publish();

In my Interactor Class I have subscribed to it to add data to a Firebase database and then I returned the ConnectableFlowable to presenter. I have called the methods subscribeOn, unsubscribeOn and observeOn passing IO thread, since the Firebase insertions don't need to execute in main thread.

@Override
    public Flowable<LocalType<List<Beer>>> searchBeers(String query) {
        ConnectableFlowable<LocalType<List<Beer>>> connectableFlowable = (ConnectableFlowable<LocalType<List<Beer>>>)
                remote.search(query);

        connectableFlowable.unsubscribeOn(schedulerProvider.io()).subscribeOn(schedulerProvider.io()).observeOn
                (schedulerProvider.io()).subscribe(onNext,
                onError);

        return connectableFlowable;
    }

In my Presenter I have subscribed to it to update the View, so I call observeOn and pass schedulerProvider.ui() (that is a wrapper to mainThread).

@Override
public void search(String query) {

    ConnectableFlowable<LocalType<List<Beer>>> flowable = (ConnectableFlowable<LocalType<List<Beer>>>)
            searchInteractor.searchBeers(query);
    flowable.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui()).unsubscribeOn
            (schedulerProvider.io())
            .subscribe(
                    beerListData -> searchView.showSearchResult(beerListData.getData()),
                    error -> searchView.showSearchError(error)
            );

    flowable.connect();
}

But when it calls flowable.connect it raises an NetworkOnMainThreadException:

E/SearchActivity: showSearchError: null
                  android.os.NetworkOnMainThreadException
                      at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1303)
                      at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:86)
                      at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:74)
                      at java.net.InetAddress.getAllByName(InetAddress.java:752)
                      at okhttp3.Dns$1.lookup(Dns.java:39)
                      at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
                      at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)
                      at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)
                      at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)
                      at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
                      at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
                      at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
                      at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
                      at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                      at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
                      at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
                      at okhttp3.RealCall.execute(RealCall.java:69)
                      at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
                      at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
                      at io.reactivex.Observable.subscribe(Observable.java:10910)
                      at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
                      at io.reactivex.Observable.subscribe(Observable.java:10910)
                      at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
                      at io.reactivex.Flowable.subscribe(Flowable.java:12994)
                      at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:32)
                      at io.reactivex.Flowable.subscribe(Flowable.java:12994)
                      at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
                      at io.reactivex.Flowable.subscribe(Flowable.java:12994)
                      at io.reactivex.internal.operators.flowable.FlowablePublish.connect(FlowablePublish.java:130)
                      at io.reactivex.flowables.ConnectableFlowable.connect(ConnectableFlowable.java:64)
                      at com.github.alexpfx.udacity.beercollection.beer.search.DefaultSearchPresenter.search(DefaultSearchPresenter.java:50)
                      at com.github.alexpfx.udacity.beercollection.SearchActivity.actionSearch(SearchActivity.java:63)
                      at com.github.alexpfx.udacity.beercollection.SearchActivity_ViewBinding$1.doClick(SearchActivity_ViewBinding.java:35)
                      at butterknife.internal.DebouncingOnClickListener.onClick(DebouncingOnClickListener.java:22)
                      at android.view.View.performClick(View.java:6261)
                      at android.view.View$PerformClick.run(View.java:23748)
                      at android.os.Handler.handleCallback(Handler.java:751)
                      at android.os.Handler.dispatchMessage(Handler.java:95)
                      at android.os.Looper.loop(Looper.java:154)
                      at android.app.ActivityThread.main(ActivityThread.java:6776)
                      at java.lang.reflect.Method.invoke(Native Method)
                      at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1496)
                      at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1386)

I don't realize what's going wrong, since the subscribe is not executed in main thread.

-- My SchedulerProvider class:

@Singleton
public class AndroidSchedulerProvider implements SchedulerProvider {

    @Inject
    public AndroidSchedulerProvider() {
    }

    @Override
    public Scheduler computation() {
        return Schedulers.computation();
    }

    @Override
    public Scheduler io() {
        return Schedulers.io();
    }

    @Override
    public Scheduler ui() {
        return AndroidSchedulers.mainThread();
    }
}

Upvotes: 0

Views: 320

Answers (1)

yosriz
yosriz

Reputation: 10267

You should put the .subscribeOn(schedulerProvider.io()) before the publish(), so the underlying service.searchBeers Flowable will subscribe on io thread, when you put the subscribeOn after publish it will affect the subscription to the ConnectableFlowable itself only.

service.searchBeers(config.getKey(), name)
            .map(Mappers.SEARCH_MAPPER)
            .subscribeOn(schedulerProvider.io())
            .publish();

Upvotes: 2

Related Questions