Reputation: 627
I have a BehaviorSubject
named createObservable
in my view model. And my view controller subscribe it.
viewModel!.createObservable.subscribe(onNext: {[unowned self] (obj:PassbookModelType?) -> Void in
if let _ = obj{
self.dismissVC()
}
}, onError: { (error) -> Void in
print(error)
}).addDisposableTo(self.dispose)
I have a function named saveObject()
also in the view model. If I click the navigation bar right item it will be emitted. And there is an error will send to createObservable
's observer.
func saveObject(){
```````
```````
if condition {
createObservable.on(Event.Next(model))
createObservable.onCompleted()
}else{
createObservable.onError(MyError.someError)
}
}
The problem is that if the error happened the createObservable will be closed, so I won't receive any Next
event in the future. I tried to use retry()
, but it seems will cause deadlock, view controller can't response any touch event any more. So can some one tell me how to fix this issue? Thanks a lot
viewModel!.createObservable.retry().subscribe(onNext: {[unowned self] (obj:PassbookModelType?) -> Void in
if let _ = obj{
self.dismissVC()
}
}, onError: { (error) -> Void in
print(error)
}).addDisposableTo(self.dispose)
Upvotes: 3
Views: 5771
Reputation: 2193
I suggest to make the type of createObservable
PublishSubject<Observable<PassbookModelType>>
, instead of BehaviorSubject<PassbookModelType?>
which, I guess, accidentally flattens two Rx streams conceptually separatable each other: the saveObject
process itself (an one-shot process) and starting the saveObject
process initiated by user action repeatedly. I've written a short example to demonstrate it.
let createObservable = PublishSubject<Observable<Int>>()
override func viewDidLoad() {
super.viewDidLoad()
createObservable.flatMap {
$0.map { obj in
print("success: \(obj)")
}
.catchError { err in
print("failure: \(err)")
return empty()
}
}.subscribe()
}
// Simulates an asynchronous proccess to succeed.
@IBAction func testSuccess(sender: UIView!) {
let oneShot = PublishSubject<Int>()
createObservable.onNext(oneShot)
callbackAfter3sec { res in
oneShot.onNext(1)
oneShot.onCompleted()
}
}
// Simulates an asynchronous process to fail.
@IBAction func testFailure(sender: UIView!) {
let oneShot = PublishSubject<Int>()
createObservable.onNext(oneShot)
callbackAfter3sec { res in
oneShot.onError(NSError(domain: "Error", code: 1, userInfo: nil))
}
}
func callbackAfter3sec(completion: Int -> ()) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, Int64(NSEC_PER_SEC * 3)), dispatch_get_main_queue()) {
completion(2)
}
}
There is an important merit with that: If the one-shot process would become in the Rx style (for example, like as callbackAfter3sec() -> Observable<Int>
) in the future, there were no need to re-write the use-side code like in the viewDidLoad
above. There is an only one change to do is to pass an Observable<>
object to createObservable.onNext(...)
.
Sorry for my poor English skill. I hope this makes sense to you.
Upvotes: 2