Reputation: 2127
I am relatively new to RxSwift, but I am looking forward to using it more in my projects and I would love to hear some feedback on an operator I just wrote.
The functionality I am missing is a debounced buffer: A buffer that behaves exactly like the debounce
operator, but instead of emitting only the latest value it should emit all collected values since the last emission.
In RxJava this is easily achievable by using a buffer with another observable as a "closing selector":
// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
In RxSwift though this version of the buffer operator doesn't exist (I think this issue is related: https://github.com/ReactiveX/RxSwift/issues/590), so I tried to solve the issue myself.
My first approach was just building the debounced buffer:
extension ObservableType {
func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
var valueBuffer: [E] = []
let observable = self.do(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
valueBuffer = []
}, onCompleted: {
valueBuffer = []
}, onSubscribe: {
valueBuffer = []
}, onDispose: {
valueBuffer = []
}).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
let emitValues = valueBuffer
valueBuffer = []
return Observable<[E]>.just(emitValues)
}
return observable
}
}
My second approach was building the buffer which any closing condition (like the RxJava version):
extension ObservableType {
func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
var valueBuffer: [E] = []
return Observable.create { observer in
let selectorSubscription = selector.subscribe(onNext: { (value) in
let emitValues = valueBuffer
valueBuffer = []
observer.on(.next(emitValues))
}, onError: { (error) in
valueBuffer = []
observer.on(.error(error))
}, onCompleted: {
valueBuffer = []
observer.on(.completed)
}, onDisposed: {
valueBuffer = []
})
let subscription = self.subscribe(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
observer.on(.error(error))
selectorSubscription.dispose()
}, onCompleted: {
observer.on(.completed)
selectorSubscription.dispose()
}, onDisposed: {
observer.on(.completed)
selectorSubscription.dispose()
})
return subscription
}
}
}
I have tested these two operators and they seem to work, also tested handling different combinations of onError, onDispose, and onCompleted events.
But I would still love to hear some feedback from more experienced people if this is at least an acceptable solution without leaks, and if I am violating any RX contracts.
I also created a pasterbin with some test code: http://pastebin.com/1iAbUPf8
Upvotes: 3
Views: 1666
Reputation: 1806
Here is mine for buffer(bufferOpenings, bufferClosingSelector)
. It may require further review.
extension ObservableType {
func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> {
var valueBuffer: [E]? = nil
let operatorObservable = Observable<[E]>.create({ observer in
let subject = PublishSubject<[E]>()
let closingsSub = bufferOpenings
.doOnNext({ _ in
valueBuffer = []
})
.flatMap({ opening in
return bufferClosingSelector(opening)
})
.subscribeNext({ _ in
if let vb = valueBuffer {
subject.onNext(vb)
}
valueBuffer = nil
}
)
let bufferSub = self.subscribe(
onNext: { value in
valueBuffer?.append(value)
},
onError: { error in
subject.onError(error)
},
onCompleted: {
subject.onCompleted()
},
onDisposed: {
}
)
let subjectSub = subject.subscribe(
onNext: { (value) in
observer.onNext(value)
},
onError: { (error) in
observer.onError(error)
},
onCompleted: {
observer.onCompleted()
},
onDisposed: {
}
)
let combinedDisposable = CompositeDisposable()
combinedDisposable.addDisposable(closingsSub)
combinedDisposable.addDisposable(bufferSub)
combinedDisposable.addDisposable(subjectSub)
return combinedDisposable
})
return operatorObservable
}
}
Upvotes: 1