SMGhost
SMGhost

Reputation: 4037

rxjava timer throws timeout exception after subscribe has successfully executed

I have following code:

repo.getObservable()
            .timeout(1, TimeUnit.MINUTES)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe {
                _isInProgress.value = true
            }
            .doFinally {
                _isInProgress.value = false
            }
            .subscribe(
                    {
                        Timber.d("Success")
                    },
                    {

                        Timber.e(it)
                    })
            .trackDisposable()

The problem with it is that I'm successfully getting Success message after couple seconds, yet my preloader still waits for 1 minute and then my error part of subscribe gets executed. Is that expected behaviour? What can I do to stop timeout if success part of subscribe gets executed?

P. S. Observable returned from getObservable() is created like this: PublishSubject.create()

Upvotes: 2

Views: 1208

Answers (2)

akarnokd
akarnokd

Reputation: 70017

If you need one result, use take(1) before or after timeout:

repo.getObservable()
        .take(1) // <---------------------------------------------
        .timeout(1, TimeUnit.MINUTES)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe {
            _isInProgress.value = true
        }
        .doFinally {
            _isInProgress.value = false
        }
        .subscribe(
                {
                    Timber.d("Success")
                },
                {

                    Timber.e(it)
                })
        .trackDisposable()

Upvotes: 2

jeprubio
jeprubio

Reputation: 18032

That must be because you are not calling onComplete on the PublishSubject and only onNext is called. See this example:

PublishSubject<Integer> source = PublishSubject.create();

// It will get 1, 2, 3, 4 and onComplete
source.subscribe(getFirstObserver()); 

source.onNext(1);
source.onNext(2);
source.onNext(3);

// It will get 4 and onComplete for second observer also.
source.subscribe(getSecondObserver());

source.onNext(4);
source.onComplete();

Until onComplete is called the observers are waiting for more results. You can either unsubscribe/dispose after the reception of the result you were waiting for or call onComplete on the Observable when all the results have been sent.

Upvotes: 2

Related Questions