Reputation: 63
I'm trying to call API and add data to my LiveData, but after I subscribe, I get MainThreadException in OnError. Tried different Schedulers, but with no success. Added stacktrace from onError.
private void pullLocation(){
myLocationService.getLocation()
.flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
@Override
public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key");
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<DistanceResponseModel>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe in DistanceViewModel called. ");
}
@Override
public void onNext(DistanceResponseModel distanceResponseModel) {
distanceLiveData.postValue(distanceResponseModel);
Log.d(TAG, "onNext in DistanceViewModel called. ");
}
@Override
public void onError(Throwable t) {
Log.e("YOUR_APP_LOG_TAG", "I got an error:", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete in DistanceViewModel called. ");
}
});
}
EDIT:
added detailed stacktrace with error code:
E/YOUR_APP_LOG_TAG: I got an error
android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:102)
at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:90)
at java.net.InetAddress.getAllByName(InetAddress.java:787)
at okhttp3.Dns.lambda$static$0(Dns.java:39)
at okhttp3.-$$Lambda$Dns$9evC3uO-H_z08sS9O-4-hLhZ8es.lookup(Unknown Source:0)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
at okhttp3.RealCall.execute(RealCall.java:81)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:35)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
at io.reactivex.Single.subscribe(Single.java:3394)
at io.reactivex.internal.operators.flowable.FlowableFlatMapSingle$FlatMapSingleSubscriber.onNext(FlowableFlatMapSingle.java:132)
at io.reactivex.internal.operators.flowable.FlowableHide$HideSubscriber.onNext(FlowableHide.java:68)
at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
at com.example.compass.viewModels.MyLocationServiceClass$1.onLocationChanged(MyLocationServiceClass.java:35)
at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
at android.location.LocationManager$ListenerTransport$1.handleMessage(LocationManager.java:237)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:164)
at android.app.ActivityThread.main(ActivityThread.java:6494)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)
This where I get location:
public class MyLocationServiceClass implements MyLocationService {
PublishProcessor<Location> stream = PublishProcessor.create();
LocationListener listener = new LocationListener() {
@Override
public void onLocationChanged(@NonNull Location location) {
stream.onNext(location);
Log.d(TAG, "onLocationChanged: stream.onNext(location) called");
}
@Override
public void onProviderEnabled(@NonNull String provider) {
}
@Override
public void onProviderDisabled(@NonNull String provider) {
}
@Override
public void onStatusChanged(String provider, int status, Bundle extras) {
}
};
private LocationManager locationManager;
private boolean flag;
private Context context;
public MyLocationServiceClass(Context context, LocationManager locationManager, boolean flag) {
this.context = context;
this.locationManager = locationManager;
this.flag = flag;
}
@OnLifecycleEvent(Lifecycle.Event.ON_START)
void onStart() {
if (flag) {
start();
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
void onStop() {
locationManager.removeUpdates(listener);
}
@Override
public Flowable<Location> getLocation() {
return stream.hide();
}
@Override
public void start() {
long minTimeMs = 10000;
float minDistanceM = 5.5F;
if (ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) != PackageManager.PERMISSION_GRANTED && ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_COARSE_LOCATION) != PackageManager.PERMISSION_GRANTED) {
return;
}
Log.d(TAG, "start(): requestLocationUpdates called.");
locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener);
}
@Override
public void updatePermission(boolean newState) {
flag = newState;
}
public interface MyLocationService extends LifecycleObserver {
Flowable<Location> getLocation();
void start();
void updatePermission(boolean newState);
}
Any idea what could it be? Let me know if you want more details. Hope this will help.
repo with whole code: https://github.com/LightingTT/Compass
Upvotes: 1
Views: 592
Reputation: 4002
An onError (NetworkOnMainThreadException) is propagated to the subscriber, when calling distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
.
Doing network requests on the UI-Eventloop is prohibited, in order not to block the UI.
subscribeOn
calls the subscribeActual method of a operator upstream on given Scheduler. In your case subscribe for flatMapSingle
operator is called from a worker-thread in the IO-Scheduler. flatMapSingle
calls myLocationService.getLocation()
on the same thread.
As you can see in the stacktrace, the onNext from source-observable, which is returned from getLocation
, is emitted from the UI-thread
at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
at com.example.compass.viewModels.MyLocationServiceClass$1.onLocationChanged(MyLocationServiceClass.java:35)
at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
at android.location.LocationManager$ListenerTransport$1.handleMessage(LocationManager.java:237)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:164)
The value is emitted via onNext on the calling-thread (UI-Thread), because the callback is called from the UI-thread. The flatMapSingle
operator is also called on the calling UI-thread. It subscribes to the returned Single
from the lambda and subscribes to it on the calling thread. Therefore the UI-thread invokes the network-request on the UI-thread and the exception is thrown by the Android runtime. SubscribeOn
does not make sure, that onNext is downstream on given Scheduler
. It only makes sure, that the subscription happens in this thread. In your case, your source-observable will get notified by the Android runtime on the UI-Thread.
Use observeOn
right after myLocationService.getLocation()
in order to switch the thread from one operator to another. observeOn
makes sure, that the onNext
call downstream happens on given scheduler. Therefore the subscription to the inner stream in flatMapSingle
would happen on given scheduler worker-thread and not the ui-thread. You could also apply subscribeOn
on the Single
in flatMapSingle
. This would make sure, that the network-call would happen on given scheduler worker-thread.
.flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
@Override
public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
.subscribeOn(Schedulers.io());
}
})
http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html?m=1
Upvotes: 3