Santosh Joseph
Santosh Joseph

Reputation: 13

How to create a queue using RxJS

I'm trying to figure out how the queueScheduler works in rxjs (version 6.2.2). I'm running an express server and what I want to do is accept multiple requests, but the function processMetricRequest2 should only process one item at at time, but the code below...when I access /test1 twice in a row, it will call processMetricRequest2 even though it hasn't finished (the webMetrics function takes a few seconds). Any ideas on what I'm doing wrong? thanks!

router.get('/test1', function(req, res, next) {
  let fn = partial(processMetricRequest2, req.query.input);
  queueScheduler.schedule(fn);
  res.render('index', { id: 1, current: {url: req.query.input}});
});
async function processMetricRequest2(url, arg) {
  console.log('--processing:', url);
  let result = await webMetrics(url);
  console.log('--FINISHED: ', url);
  return result;
}

Upvotes: 1

Views: 1182

Answers (1)

m1ch4ls
m1ch4ls

Reputation: 3425

You probably want a queue and serially process requests. That can be done using Subject and concatMap with process function. queueScheduler has totally different meaning...

const { Subject } = require('rxjs');
const { concatMap } = require('rxjs/operators');

const queue = new Subject();

router.get('/test1', function(req, res, next) {
  queue.next(req.query.input);
  res.render('index', { id: 1, current: {url: req.query.input}});
});

async function processMetricRequest2(url) {
  console.log('--processing:', url);
  let result = await webMetrics(url);
  console.log('--FINISHED: ', url);
  return result;
}

queue
  .pipe(concatMap(processMetricRequest2))
  .subscribe();

Also in case of an error the queue will stop! The subscription will terminate. So you should make sure you handle errors in processMetricRequest2

Upvotes: 1

Related Questions