michaelk
michaelk

Reputation: 2127

Implementing a debounced buffer with RxSwift, is this correct?

I am relatively new to RxSwift, but I am looking forward to using it more in my projects and I would love to hear some feedback on an operator I just wrote.

The functionality I am missing is a debounced buffer: A buffer that behaves exactly like the debounce operator, but instead of emitting only the latest value it should emit all collected values since the last emission.

In RxJava this is easily achievable by using a buffer with another observable as a "closing selector":

// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

In RxSwift though this version of the buffer operator doesn't exist (I think this issue is related: https://github.com/ReactiveX/RxSwift/issues/590), so I tried to solve the issue myself.


My first approach was just building the debounced buffer:

extension ObservableType {
    func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        var valueBuffer: [E] = []

        let observable = self.do(onNext: { (value) in
            valueBuffer.append(value)
        }, onError: { (error) in
            valueBuffer = []
        }, onCompleted: {
            valueBuffer = []
        }, onSubscribe: {
            valueBuffer = []
        }, onDispose: {
            valueBuffer = []
        }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
            let emitValues = valueBuffer
            valueBuffer = []
            return Observable<[E]>.just(emitValues)
        }

        return observable
    }
}

My second approach was building the buffer which any closing condition (like the RxJava version):

extension ObservableType {
    func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
        var valueBuffer: [E] = []

        return Observable.create { observer in
            let selectorSubscription = selector.subscribe(onNext: { (value) in
                let emitValues = valueBuffer
                valueBuffer = []
                observer.on(.next(emitValues))
            }, onError: { (error) in
                valueBuffer = []
                observer.on(.error(error))
            }, onCompleted: {
                valueBuffer = []
                observer.on(.completed)
            }, onDisposed: {
                valueBuffer = []
            })

            let subscription = self.subscribe(onNext: { (value) in
                valueBuffer.append(value)
            }, onError: { (error) in
                observer.on(.error(error))
                selectorSubscription.dispose()
            }, onCompleted: {
                observer.on(.completed)
                selectorSubscription.dispose()
            }, onDisposed: {
                observer.on(.completed)
                selectorSubscription.dispose()
            })
            return subscription
        }
    }
}

I have tested these two operators and they seem to work, also tested handling different combinations of onError, onDispose, and onCompleted events.

But I would still love to hear some feedback from more experienced people if this is at least an acceptable solution without leaks, and if I am violating any RX contracts.

I also created a pasterbin with some test code: http://pastebin.com/1iAbUPf8

Upvotes: 3

Views: 1666

Answers (1)

diegoperini
diegoperini

Reputation: 1806

Here is mine for buffer(bufferOpenings, bufferClosingSelector). It may require further review.

extension ObservableType {

    func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> {
        var valueBuffer: [E]? = nil

        let operatorObservable = Observable<[E]>.create({ observer in
            let subject = PublishSubject<[E]>()

            let closingsSub = bufferOpenings
                .doOnNext({ _ in
                    valueBuffer = []
                })
                .flatMap({ opening in
                    return bufferClosingSelector(opening)
                })
                .subscribeNext({ _ in
                    if let vb = valueBuffer {
                        subject.onNext(vb)
                    }
                    valueBuffer = nil
                }
            )

            let bufferSub = self.subscribe(
                onNext: { value in
                    valueBuffer?.append(value)
                },
                onError: { error in
                    subject.onError(error)
                },
                onCompleted: {
                    subject.onCompleted()
                },
                onDisposed: {
                }
            )

            let subjectSub = subject.subscribe(
                onNext: { (value) in
                    observer.onNext(value)
                },
                onError: { (error) in
                    observer.onError(error)
                },
                onCompleted: {
                    observer.onCompleted()
                },
                onDisposed: {
                }
            )

            let combinedDisposable = CompositeDisposable()

            combinedDisposable.addDisposable(closingsSub)
            combinedDisposable.addDisposable(bufferSub)
            combinedDisposable.addDisposable(subjectSub)

            return combinedDisposable

        })

        return operatorObservable
    }

}

Upvotes: 1

Related Questions