Pablo Sanchez Gomez
Pablo Sanchez Gomez

Reputation: 1508

RxSwift Skip Events Until Own Sequence has finished

I have one observable (we will call it trigger) that can emit many times in a short period of time. When it emits I am doing a network Request and the I am storing the result with the scan Operator.

My problem is that I would like to wait until the request is finished to do it again. (But as it is now if trigger emits 2 observables it doesn't matter if fetchData has finished or not, it will do it again)

Bonus: I also would like to take only the first each X seconds (Debounce is not the solution because it can be emitting all the time and I want to get 1 each X seconds, it isn't throttle neither because if an observable emits 2 times really fast I will get the first and the second delayed X seconds)

The code:

trigger.flatMap { [unowned self] _ in
        self.fetchData()
        }.scan([], accumulator: { lastValue, newValue in
        return lastValue + newValue
    })

and fetchData:

func fetchData() -> Observable<[ReusableCellVMContainer]>

trigger:

let trigger = Observable.of(input.viewIsLoaded, handle(input.isNearBottomEdge)).merge() 

Upvotes: 0

Views: 2077

Answers (1)

Daniel T.
Daniel T.

Reputation: 33967

I'm sorry, I misunderstood what you were trying to accomplish in my answer below.

The operator that will achieve what you want is flatMapFirst. This will ignore events from the trigger until the fetchData() is complete.

trigger
    .flatMapFirst { [unowned self] _ in
        self.fetchData()
    }
    .scan([], accumulator: { lastValue, newValue in
        return lastValue + newValue
    })

I'm leaving my previous answer below in case it helps (if anything, it has the "bonus" answer.)


The problem you are having is called "back pressure" which is when the observable is producing values faster than the observer can handle.

In this particular case, I recommend that you don't restrict the data fetch requests and instead map each request to a key and then emit the array in order:

trigger
    .enumerated()
    .flatMap { [unowned self] count, _ in
        Observable.combineLatest(Observable.just(count), self.fetchData())
    }
    .scan(into: [Int: Value](), accumulator: { lastValue, newValue in
        lastValue[newValue.0] = newValue.1
    })
    .map { $0.sorted(by: { $0.key < $1.key }).map { $0.value }}

To make the above work, you need this:

extension ObservableType {
    func enumerated() -> Observable<(Int, E)> {
        let shared = share()
        let counter = shared.scan(0, accumulator: { prev, _ in return prev + 1 })
        return Observable.zip(counter, shared)
    }
}

This way, your network requests are starting ASAP but you aren't loosing the order that they are made in.


For your "bonus", the buffer operator will do exactly what you want. Something like:

trigger.buffer(timeSpan: seconds, count: Int.max, scheduler: MainScheduler.instance)
    .map { $0.first }

Upvotes: 1

Related Questions