Reputation: 3764
Im looking for an operator (or a chain of operators) that, like flatMapFirst, discards items while waiting for the current observable to complete, which when it does, immediately spans another stream for the latest item seen.
I hope this diagram explains it better:
x -> [X1,X2,X3]
Input: -a-----------b---c--d-------------------------
Output: ---A1-A2-A3----B1--B2--B3--D1-D2-D3-----------
As you can see, c
got skipped, since d
arrived after it. But notice as well that d
arrived while b
was still being processed and as soon as b
was done, d
started being processed.
This operator allows me to finish computations, as opposed to flatMapLatest
which might switch from observable to observable, without ever giving a complete result for a computation. It behaves like flatMapFirst
but the last element is still processed, allowing a consistent state in idle periods when the input elements stream does not have any elements.
concatMap
could be the answer here, but if too many items get between b
and d
, the output stream would delay the latest item's computation far too long.
flatMapFirst (d is discarded)
Input: -a-----------b---c--d----------------
Output: ---A1-A2-A3----B1--B2--B3------------
flatMapLatest (periods of starvation can happen)
Input: -abcdefghijklmnop-----
Output: -------------------P1-
concatMap (too much work can get scheduled)
Input: -a-----------b-c-d-e-f------------------------------------------
Output: ---A1-A2-A3----B1--B2--B3-C1-C2-C3--D1-D2-D3-E1-E2-E3--F1-F2-F3-
Upvotes: 1
Views: 280
Reputation: 33979
When in doubt, you can always write your own operator. (Note that in the below, I have only tested the happy path and didn't put in any threading guards.):
extension ObservableType {
public func specialOp<O>(_ selector: @escaping (Self.E) throws -> O) -> RxSwift.Observable<O.E> where O : ObservableConvertibleType {
return Observable<O.E>.create { result in
var bag: DisposeBag! = DisposeBag()
var current: Observable<O.E>? = nil
var last: E? = nil
var innerComplete: Bool = false
func handleSub(_ element: E) {
do {
current = try selector(element).asObservable()
current!.subscribe { subEvent in
switch subEvent {
case .completed:
current = nil
if let next = last {
handleSub(next)
last = nil
}
else if innerComplete {
result.onCompleted()
bag = nil
}
case .error(let error):
result.onError(error)
case .next(let sumElement):
result.onNext(sumElement)
}
}.disposed(by: bag)
}
catch {
result.onError(error)
}
}
self.subscribe { event in
switch event {
case .completed:
innerComplete = true
case .error(let error):
result.onError(error)
case .next(let element):
if current == nil {
handleSub(element)
}
else {
last = element
}
}
}.disposed(by: bag)
return Disposables.create {
bag = nil
}
}
}
}
Upvotes: 1