Piotr Zaremba
Piotr Zaremba

Reputation: 63

RxJava NetworkOnMainThreadException while subscribing

I'm trying to call API and add data to my LiveData, but after I subscribe, I get MainThreadException in OnError. Tried different Schedulers, but with no success. Added stacktrace from onError.

private void pullLocation(){

    myLocationService.getLocation()
            .flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
                @Override
                public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
                    return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key");
                }
            })

            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new Subscriber<DistanceResponseModel>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe in DistanceViewModel called. ");
        }

        @Override
        public void onNext(DistanceResponseModel distanceResponseModel) {
            distanceLiveData.postValue(distanceResponseModel);
            Log.d(TAG, "onNext in DistanceViewModel called. ");
        }

        @Override
        public void onError(Throwable t) {
            Log.e("YOUR_APP_LOG_TAG", "I got an error:", t);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete in DistanceViewModel called. ");
        }
    });
}

EDIT:

added detailed stacktrace with error code:

E/YOUR_APP_LOG_TAG: I got an error
    android.os.NetworkOnMainThreadException
        at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:102)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:90)
        at java.net.InetAddress.getAllByName(InetAddress.java:787)
        at okhttp3.Dns.lambda$static$0(Dns.java:39)
        at okhttp3.-$$Lambda$Dns$9evC3uO-H_z08sS9O-4-hLhZ8es.lookup(Unknown Source:0)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
        at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187)
        at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
        at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
        at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
        at okhttp3.RealCall.execute(RealCall.java:81)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
        at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
        at io.reactivex.Observable.subscribe(Observable.java:12030)
        at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12030)
        at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
        at io.reactivex.Single.subscribe(Single.java:3394)
        at io.reactivex.internal.operators.flowable.FlowableFlatMapSingle$FlatMapSingleSubscriber.onNext(FlowableFlatMapSingle.java:132)
        at io.reactivex.internal.operators.flowable.FlowableHide$HideSubscriber.onNext(FlowableHide.java:68)
        at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
        at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
        at com.example.compass.viewModels.MyLocationServiceClass$1.onLocationChanged(MyLocationServiceClass.java:35)
        at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
        at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
        at android.location.LocationManager$ListenerTransport$1.handleMessage(LocationManager.java:237)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:164)
        at android.app.ActivityThread.main(ActivityThread.java:6494)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)

This where I get location:

public class MyLocationServiceClass implements MyLocationService {

    PublishProcessor<Location> stream = PublishProcessor.create();
    LocationListener listener = new LocationListener() {
        @Override
        public void onLocationChanged(@NonNull Location location) {
            stream.onNext(location);
            Log.d(TAG, "onLocationChanged: stream.onNext(location) called");
        }

        @Override
        public void onProviderEnabled(@NonNull String provider) {

        }

        @Override
        public void onProviderDisabled(@NonNull String provider) {

        }

        @Override
        public void onStatusChanged(String provider, int status, Bundle extras) {

        }
    };

    private LocationManager locationManager;
    private boolean flag;
    private Context context;

    public MyLocationServiceClass(Context context, LocationManager locationManager, boolean flag) {
        this.context = context;
        this.locationManager = locationManager;
        this.flag = flag;

    }

    @OnLifecycleEvent(Lifecycle.Event.ON_START)
    void onStart() {
        if (flag) {
            start();
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_STOP)
    void onStop() {
        locationManager.removeUpdates(listener);
    }


    @Override
    public Flowable<Location> getLocation() {
        return stream.hide();
    }

    @Override
    public void start() {
        long minTimeMs = 10000;
        float minDistanceM = 5.5F;


        if (ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) != PackageManager.PERMISSION_GRANTED && ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_COARSE_LOCATION) != PackageManager.PERMISSION_GRANTED) {

            return;
        }
        Log.d(TAG, "start(): requestLocationUpdates called.");
        locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener);

    }

    @Override
    public void updatePermission(boolean newState) {
        flag = newState;
    }

public interface MyLocationService extends LifecycleObserver {

    Flowable<Location> getLocation();

    void start();
    void updatePermission(boolean newState);
}

Any idea what could it be? Let me know if you want more details. Hope this will help.

repo with whole code: https://github.com/LightingTT/Compass

Upvotes: 1

Views: 592

Answers (1)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

Problem

An onError (NetworkOnMainThreadException) is propagated to the subscriber, when calling distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key").

Why is an exception thrown?

Doing network requests on the UI-Eventloop is prohibited, in order not to block the UI.

Why does subscribeOn not help?

subscribeOn calls the subscribeActual method of a operator upstream on given Scheduler. In your case subscribe for flatMapSingle operator is called from a worker-thread in the IO-Scheduler. flatMapSingle calls myLocationService.getLocation() on the same thread.

As you can see in the stacktrace, the onNext from source-observable, which is returned from getLocation, is emitted from the UI-thread

    at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
    at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
    at com.example.compass.viewModels.MyLocationServiceClass$1.onLocationChanged(MyLocationServiceClass.java:35)
    at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
    at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
    at android.location.LocationManager$ListenerTransport$1.handleMessage(LocationManager.java:237)
    at android.os.Handler.dispatchMessage(Handler.java:106)
    at android.os.Looper.loop(Looper.java:164)

The value is emitted via onNext on the calling-thread (UI-Thread), because the callback is called from the UI-thread. The flatMapSingle operator is also called on the calling UI-thread. It subscribes to the returned Single from the lambda and subscribes to it on the calling thread. Therefore the UI-thread invokes the network-request on the UI-thread and the exception is thrown by the Android runtime. SubscribeOn does not make sure, that onNext is downstream on given Scheduler. It only makes sure, that the subscription happens in this thread. In your case, your source-observable will get notified by the Android runtime on the UI-Thread.

Solution?

Use observeOn right after myLocationService.getLocation() in order to switch the thread from one operator to another. observeOn makes sure, that the onNext call downstream happens on given scheduler. Therefore the subscription to the inner stream in flatMapSingle would happen on given scheduler worker-thread and not the ui-thread. You could also apply subscribeOn on the Single in flatMapSingle. This would make sure, that the network-call would happen on given scheduler worker-thread.

        .flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
            @Override
            public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
                return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
                                         .subscribeOn(Schedulers.io());
            }
        })

Further reading

http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html?m=1

Upvotes: 3

Related Questions