AngularNerd
AngularNerd

Reputation: 991

Implementing debouncing batching queue with RxJS

I was trying to understand if RxJS would be a good fit for solving the problem that this node module performs: https://github.com/ericdolson/debouncing-batch-queue

It's description says: "A queue which will emit and clear its contents when its size or timeout is reached. Ideal for aggregating data for bulk apis where batching in a timely manner is best. Or anything really where batching data is needed."

If so, could someone walk me through how to implement the simple example in this npm module with RxJS? Ideally with ES5 if possible.

Upvotes: 0

Views: 905

Answers (2)

concat
concat

Reputation: 3187

There's an operator for that: bufferwithtimeorcount. If you need it to be truly equivalent, the input stream would be a Subject, with group_by for namespaces, like the following:

var dbbq$ = new Subject();
dbbq$.group_by(function(v_ns) { return v_ns[1]; })
     .flatMap(function(S) {
       return S.bufferwithtimeorcount(1000, 2)
     });
dbbq$.next([ 'ribs 0' ]);
dbbq$.next([ 'more ribs', 'bbq1' ]);

// is analogous to

var dbbq = new DBBQ(1000, 2);

dbbq.add('ribs 0');
dbbq.add('more ribs', 'bbq1');

Upvotes: 2

ZahiC
ZahiC

Reputation: 14687

No way I'm doing this with ES5 :)

const dataWithNamespace = (data, namespace) => ({data, namespace});

const source = [
  dataWithNamespace('ribs 0'),
  dataWithNamespace('ribs 1'),
  dataWithNamespace('ribs 2'),
  dataWithNamespace('ribs 3'),
  dataWithNamespace('ribs 4'),
  dataWithNamespace('more ribs', 'bbq1'),
  dataWithNamespace('more ribs', 'bbq1'),
  dataWithNamespace('brisket', 'best bbq namespace')
];

const DBBQ = (debounceTimeout, maxBatchSize) => 
  source$ => source$
    .groupBy(x => x.namespace)
    .mergeMap(grouped$ => grouped$
      .switchMap(x => 
        Rx.Observable.of(x.data)
          .concat(Rx.Observable.of(undefined)
            .delay(debounceTimeout)
          )
      )
      .bufferCount(maxBatchSize)
      .filter(x => x.length == maxBatchSize)
      .map(x => x.filter(x => x !== undefined))
    );

const source$ = Rx.Observable.from(source);
  
DBBQ(1000, 2)(source$).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Upvotes: 1

Related Questions