Pierre
Pierre

Reputation: 686

Rxswift map + concat in parallel

This Observable is executing the following

The following is returning the desired result, but I would like to start the async work in parallel.

What the correct way of doing it with Rx?

import RxSwift

func delay(time: Int, closure: () -> Void) {
  dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
    dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
  return Observable.create() { (observer) -> Disposable in
    print(desc)
    delay(time) {
      observer.onNext(value)
      observer.onCompleted()
    }
    return NopDisposable.instance
  }
}

let seq = Observable
  .of(1, 2, 3, 4, 5)
  .map { (n) -> Observable<Int> in
    return doAsyncWork(n,
      desc: "start \(n) - wait \(5 - n)",
      time: 6 - n
    )
  }
  .concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }

This produce

//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5

The desired output would be

//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5

Upvotes: 2

Views: 2574

Answers (4)

Alexander Torstling
Alexander Torstling

Reputation: 18888

The reason why this doesn't work as you expected is that concat subscribes to the source observables one at a time, waiting for the first to finish before it subscribes to the second and so on.

In RxJava there is concatEager, which does what you want - subscribe to all sources at the start while still preserving order. But not in Swift it seems.

What you could do is zip each item with its index, flatMap, sort on index and unzip.

Upvotes: 0

solidcell
solidcell

Reputation: 7729

This won't help you right now, but maybe it will help others in the future.

The operator you're looking for is called concatMap. However, at the moment, it doesn't exist in RxSwift.

There currently exists a closed PR for it here.

Upvotes: 0

Pierre
Pierre

Reputation: 686

This seems to work not sure this is the best answer though

import RxSwift

func delay(time: Int, closure: () -> Void) {
  dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
    dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
  return Observable.create() { (observer) -> Disposable in
    print(desc)
    delay(time) {
      observer.onNext(value)
      observer.onCompleted()
    }
    return NopDisposable.instance
  }
}

let seq = Observable
  .of(1, 2, 3, 4, 5)
  .map { (n) -> Observable<Int> in
    let o = doAsyncWork(n,
      desc: "start \(n) - wait \(5 - n)",
      time: 6 - n
    ).shareReplay(1)
    o.subscribe()
    return o.asObservable()
  }
  .concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }

Upvotes: 2

solidcell
solidcell

Reputation: 7729

Your "desired output" seems to disagree with your desire to have the Observables start "in parallel", yet delay their elements such that "5" has no delay, "4" has a 1 second delay, "3" has a 2 second delay, etc.

I would think you're looking for this output:

start 1 - wait 4
start 2 - wait 3
start 3 - wait 2
start 4 - wait 1
start 5 - wait 0
5
4
3
2
1

This is something you could use to do that:

Observable.range(start: 1, count: 5)
    .flatMap { n -> Observable<Int> in
        let waitInterval = 5 - n
        print("start \(n) - wait \(waitInterval)")
        return Observable.just(n)
            .delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance)
    }
    .subscribeNext { i in
        print(i)
    }
    .addDisposableTo(disposeBag)

If you meant something else, you could probably easily tweak this snippet to accomplish your goal.

Upvotes: 1

Related Questions