dev.farmer
dev.farmer

Reputation: 2779

How to handle error on multiple chained Observable in RxJava?

I am developing an Android app using Kotlin, RxJava, and Retrofit. I want to send Http Request to the server.

  1. PUT - update option of job
  2. POST - run the job

After the first request success, then I send a second request. So I used concatMap.

val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST

updateJob.concatMap { runJob }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ job ->
        Log.d(TAG, "runJob - success: $job")
    }, {
        Log.e(TAG, "runJob - failed: ${it.message}")
        it.printStackTrace()
    })

What I want is to cancel, if the first request is failed. How should I do this?

Here is a possible code. But... this code is... I think this is ugly... Is there any cool code, please?

disposable.add(
            restService.updateJob(token, job.id, options)    // PUT
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ job ->
                    Log.d(TAG, "updateJob - success")
                    restService.runJob(token, job.id)    // POST
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ job ->
                            Log.d(TAG, "runJob - success")
                        }, {
                            Log.e(TAG, "runJob - failed: ${it.message}")
                            it.printStackTrace()
                        })
                }, {
                    Log.e(TAG, "updateJob - failed: ${it.message}")
                    it.printStackTrace()
                })
        )

I have one more question. I have the job list. And I want to do the same thing. Even if some jobs fail, I want to continue with the next jobs. I considered "onErrorResumeNext, onErrorReturn, doOnError". But they are not the solution. How can I do this?

Observable.fromIterable(jobs)
            .concatMap { job ->
                val updateJob = restService.updateJob(token, job.id, options)
                val printJob = restService.printJob(token, job.id)

                updateJob.concatMap { printJob }
            }
            .window(1)    // I thought "window" can be the solution. But it doesn't work.
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ job ->
                Log.d(TAG, "runJobs - success")

            }, {
                Log.e(TAG, "runJobs - failed: ${it.message}")
                it.printStackTrace()
            })

Upvotes: 0

Views: 589

Answers (1)

Prithvi Bhola
Prithvi Bhola

Reputation: 3151

Actually, you have already given the answer. Your first case is correct

val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST

updateJob.concatMap { runJob }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ job ->
        Log.d(TAG, "runJob - success: $job")
    }, {
        Log.e(TAG, "runJob - failed: ${it.message}")
        it.printStackTrace()
    })

So in this case, if updateJob request gets failed then the stream will move into error stream and runJob request will never be called.

runJob will be called only when updateJob is successful.

And after updateJob success if runJob fails, then also error stream will be called.

There is no need for your second solution.

And for your second question onErrorResumeNext should work. Return any dummy value and handle it in the onNext

Observable.fromIterable(jobs)
                    .concatMap { job -> restService.updateJob(token, job.id, options).onErrorResumeNext(Flowable.just(/*Send any dummy value*/)) }
                    .contcatMap { job -> restService.printJob(token, job.id).onErrorResumeNext{Flowable.just(/*Send any dummy value*/)} }
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe({ job ->
                        /*If dummy value received....handle it gracefully*/
                        Log.d(TAG, "runJobs - success")

                    }, {
                        Log.e(TAG, "runJobs - failed: ${it.message}")
                        it.printStackTrace()
                    })

Upvotes: 1

Related Questions