johnny_crq
johnny_crq

Reputation: 4391

RxJava Chained Observables and NetworkMainThreadException

So I have this code:

public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
    return Observable.<AbstractXMPPConnection>create(subscriber -> {
        try {
            AbstractXMPPConnection connection2 = connection.connect();
            if (connection2.isConnected()) {
                subscriber.onNext(connection2);
                subscriber.onCompleted();
            }
        } catch (SmackException | IOException | XMPPException e) {
            e.printStackTrace();
            subscriber.onError(e);
        }
    })
    .doOnError(throwable -> LOGI("111", "Connection OnError called"));
}


public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
       return connect(connection)
               .retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
                       .flatMap(pair -> {
                           if (pair.second == MAX_LOGIN_TRIES)
                               return Observable.error(pair.first);
                           return Observable.timer(pair.second, TimeUnit.SECONDS);
                       }));
    }


public void connect() {
        assertTrue("To start a connection to the server, you must first call init() method!",
                this.connectionConfig != null);

        connectionHelper.connectWithRetry(connection)
                .observeOn(Schedulers.newThread())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<AbstractXMPPConnection>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                        LOGI(TAG, "ConnectionHelper Connection onError\n");

                        /**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
                        MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
                    }

                    @Override
                    public void onNext(AbstractXMPPConnection connection) {
                        LOGI(TAG, "ConnectionHelper Connection onNext");
//                        onConnected();
                    }
                });
    }

I have some questions about chaining observables. Imagining this scenario, in which I have a connect Observable, which sometimes I use, but I use mainly the connectWithRetry() Observable.

My question is, what would happen if a added this:

.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())

To both the connect() and connectWithRetry()? In this scenario, when I call public void connect and specify a scheduler, the previous ones are ignored?

And why am I getting NetworkOnMainThreadException? The explicit observeOn(Schedulers.newThread()) is there, it shouldnt be giving me that error

Upvotes: 3

Views: 177

Answers (2)

Adam S
Adam S

Reputation: 16394

I'll address your NetworkOnMainThread issue first.

observeOn(Schedulers.newThread()) means the output will be observed on a new thread - that is, the code in your subscriber (onComplete/Error/Next) will be run on that thread.

subscribeOn(AndroidSchedulers.mainThread() means subscription will happen on the main thread - the code in your created observable (connection.connect() etc) is what is run when subscription happens.

So simply swap the schedulers:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

So to address your first question, they're not ignored, they're just being used incorrectly. Hopefully from this you can see what would happen if you moved similar calls in to the chain inside your methods that return observables: nothing different to what you've already done. The calls would simply be in a different place.

So where to put the scheduler selection? That's up to you. You may gain increased clarity by not having the subscribeOn call inside the methods for creating your observables:

 connectionHelper.connectWithRetry(connection)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

However, if you feel like you're calling this everywhere for no reason, you can instead move the subscribeOn call inside your methods:

return connect(connection)
           .retryWhen(...)
           .flatMap(...)
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread());

Note that these don't have to be bundled up together like this - you could subscribeOn inside your method, but leave observeOn up to any callers that want their results on a specific thread.

Upvotes: 1

Darshan Mistry
Darshan Mistry

Reputation: 3372

Please try Schedulers.io() may be issue resolve.

Upvotes: 0

Related Questions