ImShrey
ImShrey

Reputation: 418

Dynamically terminate Observable based on another observable | RxSwift

I have array of Observables, say [Observable <WriteTaskResult>]

I want to perform all write tasks keeping their order, and if any one of them fails then I want to perform Observable<ResetTaskResult>

Following function will return observable of type BatchTasksResult for tracking tasks progress.

Sample Code:

enum BatchTasksResult{
    case elapsedTime(Double)
    case failedFatal
    case rolledback
    case success
}

func writeBlocks(tasks: [WriteTask]) -> Observable<BatchTasksResult>{
    return Observable.create {(observable) -> Disposable in
       let allTasks: [Observable<WriteTaskResult>] = self.writeSomewhere(tasks)
       Observable.concat(allTasks)
         .subscribe { writeTaskResult in
               observable.onNext(.elapsedTime(writeTaskResult.totalTime))
            } 
            onError: { (err) in
               // Perform Observable<ResetTaskResult>
               // if ResetTask was successful then observable.onNext(.rolledback)
               // if ResetTask failed then observable.onNext(.failedFatal)
            }
            onCompleted: {
               observable.onNext(.success)
            }
            .disposed(by: disposeBag)
       return Disposables.create()
    }
}

How do I trigger rollback logic using Observable from onError of allTasks' observable?

Simple solution seems nested observable, but that's not good practice, I guess? I tried FlatMap, but it can't really solve "If any sinlge task fails, then rollback and reset" Any other solution to this?

Upvotes: 1

Views: 467

Answers (1)

Daniel T.
Daniel T.

Reputation: 33967

There's no need to add the extra level of indirection with the create function. Every Observable operator already creates a new object.

And when you do use Observable.create, do not dispose in an external dispose bag and return a Disposables.create(). Just return the disposable that you just created.

Here's the appropriate way to do what you want:

func writeBlocks(tasks: [WriteTask], resetTask: Single<ResetTaskResult>) -> Observable<BatchTasksResult> {
    // create the array of write tasks and concat them. You seem to have that down.
    let result = Observable.concat(tasks.map(writeSomewhere(task:)).map { $0.asObservable() })
        .share() // the share is needed because you are using the value twice below.
    return Observable.merge(
        // push out the elapsed time for each task.
        result.map { BatchTasksResult.elapsedTime($0.totalTime) },
        // when the last one is done, push out the success event.
        result.takeLast(1).map { _ in BatchTasksResult.success }
    )
    .catch { _ in
        resetTask // the resetTask will get subscribed to if needed.
            .map { _ in BatchTasksResult.rolledback } // if successful emit a rollback
            .catch { _ in Single.just(BatchTasksResult.failedFatal) } // otherwise emit the failure.
            .asObservable()
    }
}

func writeSomewhere(task: WriteTask) -> Single<WriteTaskResult> {
    // create a Single that performs the write and emits a result.
}

Upvotes: 1

Related Questions