Reputation: 6700
I'm having problems dealing with ConnectableFlowable and Threads:
My retrofit service returns a ConnectableFlowable:
service.searchBeers(config.getKey(), name)
.map(Mappers.SEARCH_MAPPER).publish();
In my Interactor Class I have subscribed to it to add data to a Firebase database and then I returned the ConnectableFlowable to presenter. I have called the methods subscribeOn, unsubscribeOn and observeOn passing IO thread, since the Firebase insertions don't need to execute in main thread.
@Override
public Flowable<LocalType<List<Beer>>> searchBeers(String query) {
ConnectableFlowable<LocalType<List<Beer>>> connectableFlowable = (ConnectableFlowable<LocalType<List<Beer>>>)
remote.search(query);
connectableFlowable.unsubscribeOn(schedulerProvider.io()).subscribeOn(schedulerProvider.io()).observeOn
(schedulerProvider.io()).subscribe(onNext,
onError);
return connectableFlowable;
}
In my Presenter I have subscribed to it to update the View, so I call observeOn and pass schedulerProvider.ui() (that is a wrapper to mainThread).
@Override
public void search(String query) {
ConnectableFlowable<LocalType<List<Beer>>> flowable = (ConnectableFlowable<LocalType<List<Beer>>>)
searchInteractor.searchBeers(query);
flowable.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui()).unsubscribeOn
(schedulerProvider.io())
.subscribe(
beerListData -> searchView.showSearchResult(beerListData.getData()),
error -> searchView.showSearchError(error)
);
flowable.connect();
}
But when it calls flowable.connect it raises an NetworkOnMainThreadException:
E/SearchActivity: showSearchError: null
android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1303)
at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:86)
at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:74)
at java.net.InetAddress.getAllByName(InetAddress.java:752)
at okhttp3.Dns$1.lookup(Dns.java:39)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)
at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall.execute(RealCall.java:69)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
at io.reactivex.Flowable.subscribe(Flowable.java:12994)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:32)
at io.reactivex.Flowable.subscribe(Flowable.java:12994)
at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
at io.reactivex.Flowable.subscribe(Flowable.java:12994)
at io.reactivex.internal.operators.flowable.FlowablePublish.connect(FlowablePublish.java:130)
at io.reactivex.flowables.ConnectableFlowable.connect(ConnectableFlowable.java:64)
at com.github.alexpfx.udacity.beercollection.beer.search.DefaultSearchPresenter.search(DefaultSearchPresenter.java:50)
at com.github.alexpfx.udacity.beercollection.SearchActivity.actionSearch(SearchActivity.java:63)
at com.github.alexpfx.udacity.beercollection.SearchActivity_ViewBinding$1.doClick(SearchActivity_ViewBinding.java:35)
at butterknife.internal.DebouncingOnClickListener.onClick(DebouncingOnClickListener.java:22)
at android.view.View.performClick(View.java:6261)
at android.view.View$PerformClick.run(View.java:23748)
at android.os.Handler.handleCallback(Handler.java:751)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6776)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1496)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1386)
I don't realize what's going wrong, since the subscribe is not executed in main thread.
-- My SchedulerProvider class:
@Singleton
public class AndroidSchedulerProvider implements SchedulerProvider {
@Inject
public AndroidSchedulerProvider() {
}
@Override
public Scheduler computation() {
return Schedulers.computation();
}
@Override
public Scheduler io() {
return Schedulers.io();
}
@Override
public Scheduler ui() {
return AndroidSchedulers.mainThread();
}
}
Upvotes: 0
Views: 320
Reputation: 10267
You should put the .subscribeOn(schedulerProvider.io())
before the publish()
, so the underlying service.searchBeers
Flowable
will subscribe on io thread, when you put the subscribeOn after publish it will affect the subscription to the ConnectableFlowable
itself only.
service.searchBeers(config.getKey(), name)
.map(Mappers.SEARCH_MAPPER)
.subscribeOn(schedulerProvider.io())
.publish();
Upvotes: 2