Gonzalo
Gonzalo

Reputation: 3764

flatMapFirst, but delaying the latest item

Im looking for an operator (or a chain of operators) that, like flatMapFirst, discards items while waiting for the current observable to complete, which when it does, immediately spans another stream for the latest item seen.

I hope this diagram explains it better:

x -> [X1,X2,X3]

Input:  -a-----------b---c--d-------------------------
Output: ---A1-A2-A3----B1--B2--B3--D1-D2-D3-----------

As you can see, c got skipped, since d arrived after it. But notice as well that d arrived while b was still being processed and as soon as b was done, d started being processed.

This operator allows me to finish computations, as opposed to flatMapLatest which might switch from observable to observable, without ever giving a complete result for a computation. It behaves like flatMapFirst but the last element is still processed, allowing a consistent state in idle periods when the input elements stream does not have any elements.

concatMap could be the answer here, but if too many items get between b and d, the output stream would delay the latest item's computation far too long.

flatMapFirst (d is discarded)

Input:  -a-----------b---c--d----------------
Output: ---A1-A2-A3----B1--B2--B3------------

flatMapLatest (periods of starvation can happen)

Input:  -abcdefghijklmnop-----
Output: -------------------P1-

concatMap (too much work can get scheduled)

Input:  -a-----------b-c-d-e-f------------------------------------------
Output: ---A1-A2-A3----B1--B2--B3-C1-C2-C3--D1-D2-D3-E1-E2-E3--F1-F2-F3-

Upvotes: 1

Views: 280

Answers (1)

Daniel T.
Daniel T.

Reputation: 33979

When in doubt, you can always write your own operator. (Note that in the below, I have only tested the happy path and didn't put in any threading guards.):

extension ObservableType {
    public func specialOp<O>(_ selector: @escaping (Self.E) throws -> O) -> RxSwift.Observable<O.E> where O : ObservableConvertibleType {
        return Observable<O.E>.create { result in
            var bag: DisposeBag! = DisposeBag()
            var current: Observable<O.E>? = nil
            var last: E? = nil
            var innerComplete: Bool = false
            func handleSub(_ element: E) {
                do {
                    current = try selector(element).asObservable()
                    current!.subscribe { subEvent in
                        switch subEvent {
                        case .completed:
                            current = nil
                            if let next = last {
                                handleSub(next)
                                last = nil
                            }
                            else if innerComplete {
                                result.onCompleted()
                                bag = nil
                            }
                        case .error(let error):
                            result.onError(error)
                        case .next(let sumElement):
                            result.onNext(sumElement)
                        }
                    }.disposed(by: bag)
                }
                catch {
                    result.onError(error)
                }
            }
            self.subscribe { event in
                switch event {
                case .completed:
                    innerComplete = true
                case .error(let error):
                    result.onError(error)
                case .next(let element):
                    if current == nil {
                        handleSub(element)
                    }
                    else {
                        last = element
                    }
                }
            }.disposed(by: bag)
            return Disposables.create {
                bag = nil
            }
        }
    }
}

Upvotes: 1

Related Questions