Reputation: 991
I was trying to understand if RxJS would be a good fit for solving the problem that this node module performs: https://github.com/ericdolson/debouncing-batch-queue
It's description says: "A queue which will emit and clear its contents when its size or timeout is reached. Ideal for aggregating data for bulk apis where batching in a timely manner is best. Or anything really where batching data is needed."
If so, could someone walk me through how to implement the simple example in this npm module with RxJS? Ideally with ES5 if possible.
Upvotes: 0
Views: 905
Reputation: 3187
There's an operator for that™: bufferwithtimeorcount
. If you need it to be truly equivalent, the input stream would be a Subject
, with group_by
for namespaces, like the following:
var dbbq$ = new Subject();
dbbq$.group_by(function(v_ns) { return v_ns[1]; })
.flatMap(function(S) {
return S.bufferwithtimeorcount(1000, 2)
});
dbbq$.next([ 'ribs 0' ]);
dbbq$.next([ 'more ribs', 'bbq1' ]);
// is analogous to
var dbbq = new DBBQ(1000, 2);
dbbq.add('ribs 0');
dbbq.add('more ribs', 'bbq1');
Upvotes: 2
Reputation: 14687
No way I'm doing this with ES5 :)
const dataWithNamespace = (data, namespace) => ({data, namespace});
const source = [
dataWithNamespace('ribs 0'),
dataWithNamespace('ribs 1'),
dataWithNamespace('ribs 2'),
dataWithNamespace('ribs 3'),
dataWithNamespace('ribs 4'),
dataWithNamespace('more ribs', 'bbq1'),
dataWithNamespace('more ribs', 'bbq1'),
dataWithNamespace('brisket', 'best bbq namespace')
];
const DBBQ = (debounceTimeout, maxBatchSize) =>
source$ => source$
.groupBy(x => x.namespace)
.mergeMap(grouped$ => grouped$
.switchMap(x =>
Rx.Observable.of(x.data)
.concat(Rx.Observable.of(undefined)
.delay(debounceTimeout)
)
)
.bufferCount(maxBatchSize)
.filter(x => x.length == maxBatchSize)
.map(x => x.filter(x => x !== undefined))
);
const source$ = Rx.Observable.from(source);
DBBQ(1000, 2)(source$).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
Upvotes: 1