巩小鹏
巩小鹏

Reputation: 69

RxSwift async task

I want to start a login task by a login button tapped, after this finished, fetch user order list, shipping address, wish list, other info. startTask is a button, user tap it, i will start these tasks, but now if the login task failure, user tap startTask button again, i can't start these tasks again, why?
Sample code

private func test() {

    let data = ["fetch order list", "fetch shipping addresses", "fetch wishlist", "fetch other info"]

    let fetchInfoTasks = data.map{ asyncTask($0) }.toObservable()
    let someTasks = fetchInfoTasks.merge().toArray()
    let result = login().flatMapLatest{ _ in someTasks }

    startTask
        .rx_tap
        .flatMapLatest{ result }
        .catchError{ error in
            .....error......
            return Observable.empty()
        }
        .subscribeNext{ tasks in
            .....all completed....
        }
        .addDisposableTo(disposeBag)
}

private func login()-> Observable<String> {
    return Observable.create{ observer in
        performClosure(afterDealy: 1, onMainQueue: false) {
            if arc4random() % 4 == 0 {
                observer.onNext("login finished")
                observer.onCompleted()
            } else {
                observer.onError(NSError(domain: "", code: -1, userInfo: [NSLocalizedDescriptionKey: "some error"]))
            }
        }
        return AnonymousDisposable{}
    }
}

private func asyncTask(name: String)-> Observable<String> {
    return Observable.create{ observer in
        let delay = Double(arc4random() % 6 + 1)
        performClosure(afterDealy: delay, onMainQueue: false) {
            observer.onNext(name)
            observer.onCompleted()
        }
        return AnonymousDisposable{}
    }
}

func performClosure(afterDealy delay: Double, onMainQueue mainQueueOrNot: Bool, action: dispatch_block_t) {
    let delayIntervals = Double(NSEC_PER_SEC) * delay
    let time = dispatch_time(DISPATCH_TIME_NOW, Int64(delayIntervals))
    let queue = mainQueueOrNot ? dispatch_get_main_queue() : dispatch_get_global_queue(QOS_CLASS_UTILITY, 0)
    dispatch_after(time, queue, action)
}

Upvotes: 4

Views: 5164

Answers (2)

NikGreen
NikGreen

Reputation: 700

As stated in catchError documentation:

Returns an observable sequence containing the source sequence's elements, followed by the elements produced by the handler's resulting observable sequence in case an error occurred.

Thus when an error occurs in your login chain, the error is caught in the catchError closure and the sequence is terminated.

Here's a sample output of what's happening:

2016-05-27 10:03:18.634: AppDelegate.swift:59 (test()) -> subscribed
2016-05-27 10:03:19.792: AppDelegate.swift:59 (test()) -> Event Error(Error Domain= ...ription=some error})
2016-05-27 10:03:19.796: AppDelegate.swift:59 (test()) -> disposed

What worked for me in similar cases is to move the catchError handler inside the closure which actually does some work (e.g. executing network request) and return some kind of a Result enum to handle the error in subscribeNext.

Here's the adjusted test() function that uses the technique I've described

enum Result<T>
{
    case Success(value: T)
    case Failure(error: ErrorType)
}

private func test() {

    let data = ["fetch order list", "fetch shipping addresses", "fetch wishlist", "fetch other info"]

    let fetchInfoTasks = data.map{ asyncTask($0) }.toObservable()
    let someTasks = fetchInfoTasks.merge().toArray()
    let result = login().flatMapLatest{ _ in someTasks }


    let resultHandled = result.map{ Result.Success(value: $0) }
                                .catchError { .just(Result.Failure(error: $0)) }


    let startTask = Observable<Int>.timer(0, period: 5, scheduler: MainScheduler.instance);

    startTask
        .flatMapLatest{ _ in resultHandled }
        .debug()
        .subscribeNext{ (result) in
            print("\(result)")
        }
        .addDisposableTo(disposeBag)
}

And the output is:

2016-05-27 10:07:25.507: AppDelegate.swift:59 (test()) -> subscribed

2016-05-27 10:07:26.614: AppDelegate.swift:59 (test()) -> Event Next(Failure(Error D...iption=some error}))
Failure(Error Domain= Code=-1 "some error" UserInfo={NSLocalizedDescription=some error})

2016-05-27 10:07:34.878: AppDelegate.swift:59 (test()) -> Event Next(Success(["fetch...ipping addresses"]))
Success(["fetch wishlist", "fetch order list", "fetch other info", "fetch shipping addresses"])

2016-05-27 10:07:41.603: AppDelegate.swift:59 (test()) -> Event Next(Failure(Error D...iption=some error}))
Failure(Error Domain= Code=-1 "some error" UserInfo={NSLocalizedDescription=some error})

2016-05-27 10:07:46.588: AppDelegate.swift:59 (test()) -> Event Next(Failure(Error D...iption=some error}))
Failure(Error Domain= Code=-1 "some error" UserInfo={NSLocalizedDescription=some error})

So as you can see, the main sequence (in my case it's timer, in yours it's button tap events sequence) doesn't error out and you can handle the result in subscribeNext. Hope that helps!

UPD

Here're some of the resources that you may find helpful:

  1. https://github.com/ReactiveX/RxSwift/tree/master/RxExample
  2. http://rx-marin.com/
  3. https://github.com/artsy/eidolon
  4. https://realm.io/news/ - they have several talks on RxSwift
  5. https://gist.github.com/JaviLorbada/4a7bd6129275ebefd5a6 - list of FRP resources
  6. http://slack.rxswift.org/ - almighty RxSwift community members:)

Upvotes: 2

cfraz89
cfraz89

Reputation: 161

Once an error occurs, the stream terminates. You don't want this to happen at the level of your button, so you must catch errors at a deeper level.

Eg:

startTask
    .rx_tap
    .flatMapLatest{
        result
            .catchError{ error in
                 .....error......
                 return Observable.empty()
            }
    }
    .subscribeNext{ tasks in
        .....all completed....
    }
    .addDisposableTo(disposeBag)

This way you can prevent an actual Rx error from bubbling up through the flatMap.

If you need to do something on error, you will probably want to wrap your result in some kind of result enum (recommended https://github.com/antitypical/Result).

An example of this would be:

startTask
    .rx_tap
    .flatMapLatest{
        result
            .map { Result.Success(result: $0)
            .catchError{ error in return Result.Error(error: $0) }
    }
    .subscribeNext{ result in
        switch(result) {
            case .Success(let result):
                //display happy case
            case .Error(let error):
                //display sad case
    }
    .addDisposableTo(disposeBag)

Upvotes: 3

Related Questions