Reputation: 686
This Observable is executing the following
The following is returning the desired result, but I would like to start the async work in parallel.
What the correct way of doing it with Rx?
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
return doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
)
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }
This produce
//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5
The desired output would be
//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5
Upvotes: 2
Views: 2574
Reputation: 18888
The reason why this doesn't work as you expected is that concat
subscribes to the source observables one at a time, waiting for the first to finish before it subscribes to the second and so on.
In RxJava there is concatEager
, which does what you want - subscribe to all sources at the start while still preserving order. But not in Swift it seems.
What you could do is zip each item with its index, flatMap, sort on index and unzip.
Upvotes: 0
Reputation: 7729
This won't help you right now, but maybe it will help others in the future.
The operator you're looking for is called concatMap
. However, at the moment, it doesn't exist in RxSwift
.
There currently exists a closed PR for it here.
Upvotes: 0
Reputation: 686
This seems to work not sure this is the best answer though
import RxSwift
func delay(time: Int, closure: () -> Void) {
dispatch_after(
dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
dispatch_get_main_queue(), closure)
}
func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
return Observable.create() { (observer) -> Disposable in
print(desc)
delay(time) {
observer.onNext(value)
observer.onCompleted()
}
return NopDisposable.instance
}
}
let seq = Observable
.of(1, 2, 3, 4, 5)
.map { (n) -> Observable<Int> in
let o = doAsyncWork(n,
desc: "start \(n) - wait \(5 - n)",
time: 6 - n
).shareReplay(1)
o.subscribe()
return o.asObservable()
}
.concat()
let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }
Upvotes: 2
Reputation: 7729
Your "desired output" seems to disagree with your desire to have the Observable
s start "in parallel", yet delay their elements such that "5" has no delay, "4" has a 1 second delay, "3" has a 2 second delay, etc.
I would think you're looking for this output:
start 1 - wait 4
start 2 - wait 3
start 3 - wait 2
start 4 - wait 1
start 5 - wait 0
5
4
3
2
1
This is something you could use to do that:
Observable.range(start: 1, count: 5)
.flatMap { n -> Observable<Int> in
let waitInterval = 5 - n
print("start \(n) - wait \(waitInterval)")
return Observable.just(n)
.delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance)
}
.subscribeNext { i in
print(i)
}
.addDisposableTo(disposeBag)
If you meant something else, you could probably easily tweak this snippet to accomplish your goal.
Upvotes: 1