Reputation: 418
In Node.js I am trying to get the following behaviour: During the runtime of my Express application I accumulate several IDs
of objects that need further processing. For further processing, I need to transmit these IDs
to a different service. The other service however, cannot handle a lot of requests, but rather requires batch transmission. Hence I need to accumulate a lot of individual requests to a bigger one while allowing persistence.
tl;dr — Over 15 minutes, several
IDs
are accumulated in my application, then after this 15 minute-window, they're all emitted at once. At the same time, the next windows is opened.
From my investigation, this is probably the abstract data type of a multiset: The items in my multiset
(or bag
) can have duplicates (hence the multi-
), they're packaged by the time window, but have no index.
My infrastructure is already using redis, but I am not sure whether there's a way to accumulate data into one single job. Or is there? Are there any other sensible ways to achieve this kind of behavior?
Upvotes: 0
Views: 213
Reputation: 418
I came up with a npm module to solve this specific problem using a MySQL database for persistence:
persistent-bag: This is to bags like redis is to queues. A bag (or multiset) is filled over time and processed at once.
On instantiation of the object, the required table is created in the provided MySQL database if necessary.
var PersistentBag = require('persistent-bag');
var bag = new PersistentBag({
host: 'localhost',
port: '3306',
user: 'root',
password: '',
database: 'test'
});
Then items can be .add()
ed during the runtime of any number of applications:
var item = {
title: 'Test item to store to bag'
};
bag.add(item, function (err, itemId) {
console.log('Item id: ' + itemId);
});
Working with the emitted, aggregated items every 15 minutes is done like in kue for redis by subscribing to .process()
:
bag.process(function worker(bag, done) {
// bag.data is now an array of all items
doSomething(bag.data, function () {
done();
});
});
Upvotes: 1
Reputation: 6994
I might be misunderstanding some of the subtlety of your particular situation, but here goes.
Here is a simple sketch of some code that processes a batch of 10 items at a time. The way you would do this differs slightly depending on whether the processing step is synchronous or asynchronous. You don't need anything more complicated than an array for this, since arrays have constant time push
and length
methods and that is the only thing you need to do. You may want to add another option to flush the batch after a given item in inserted.
Synchronous example:
var batch = [];
var batchLimit = 10;
var sendItem = function (item) {
batch.push(item);
if (item.length >= batchLimit) {
processBatchSynch(batch);
batch = [];
}
}
Asynchronous example:
// note that in this case the job of emptying the batch array
// has to be done inside the callback.
var batch = [];
var batchLimit = 10;
// your callback might look something like function(err, data) { ... }
var sendItem = function (item, cb) {
batch.push(item);
if (item.length >= batchLimit) {
processBatchAsync(batch, cb);
}
}
Upvotes: 1