pberden
pberden

Reputation: 130

How to buffer observables in pairs and execute them by the pair?

I have an xhr request that is getting an array with which I execute subsequent xhr requests like so:

const Rx = require('rxjs/Rx');
const fetch = require('node-fetch');

const url = `url`;

// Get array of tables
const tables$ = Rx.Observable
  .from(fetch(url).then((r) => r.json()));

// Get array of columns
const columns$ = (table) => {
  return Rx.Observable
    .from(fetch(`${url}/${table.TableName}/columns`).then(r => r.json()));
};

tables$
  .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$)))    
  .subscribe(val => console.log(val));

I would like to execute the column requests in chuncks so that the requests are not being sent to the server at once.

This SO question is somewhat in the same direction but not completely: Rxjs: Chunk and delay stream?

Now I'm trying something like this:

tables$
  .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$)))
  .flatMap(e => e)
  .bufferCount(4)
  .executeTheChunksSerial(magic)
  .flatMap(e => e)
  .subscribe(val => console.log(val));

But I cannot wrap my head around how to execute the chunks in series...

Upvotes: 0

Views: 718

Answers (1)

Mark van Straten
Mark van Straten

Reputation: 9425

You can utilize the concurrency argument of mergeMap to get max x requests concurrently to your server:

const getTables = Promise.resolve([{ tableName: 'foo' },{ tableName: 'bar' },{ tableName: 'baz' }]);
const getColumns = (table) => Rx.Observable.of('a,b,c')
  .do(_ => console.log('getting columns for table: ' + table))
  .delay(250);
      
Rx.Observable.from(getTables)
  .mergeAll()
  .mergeMap(
    table => getColumns(table.tableName),
    (table, columns) => ({ table, columns }),
    2)
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.js"></script>

Upvotes: 3

Related Questions