Inkling
Inkling

Reputation: 3782

Node.js: Parallel processing with sequential + async input/output

I'm trying to build an efficient asynchronous processing pipeline in Node.js for an array of potentially tens of thousands of items. The pipeline starts with a call to a web API (using the node-fetch package), goes through multiple parsing/transformation steps, then concludes with appending to a file on disk.

However there are some requirements that, when taken together, are proving to make this difficult:

  1. The web API has a limited number of requests allowed per minute, therefore I have to be able to throttle / set a delay between each initial fetch call. That means this stage must be asynchronously sequential.

  2. All results are written to the same file, and must be appended in the same order as specified by the initial array, so this stage must also be sequential.

  3. Otherwise, for general performance, things should run in parallel as much as possible. Examples:

    a. Earlier items should be able to be processed, including the file-writing step (assuming requirement 2 is met), at the same time that later items haven't been fetched yet (due to the throttling in point 1).

    b. Later items should only be delayed by earlier items (for instance, if one has a particularly large API response body or particularly long parse time) at the final file-writing step (in order to meet requirement 2). There should be no order dependency between items for the intermediate steps.

It should be noted that I'm using Node 10, so I do have async iterators / for await available. My closest attempt looks something like this (pretend it's in an async function context):

const valuePromises = [];

const delaySequence = itemArray.reduce(async (sequence, item) => {
  await sequence;
  const valuePromise = fetch(item.url)
    .then(step1)
    .then(step2)
    .then(step3);
  valuePromises.push(valuePromise);
  return sleep(1000);  // promisified setTimeout
}, Promise.resolve());

// If I don't do this the valuePromises array won't be fully populated:
await delaySequence;

for await (const value of valuePromises) {
  await appendToFile(value);
}

This works with the exception that it violates point "a" above, because it has to wait until all the fetches have been fired before it can start appending to the file.

I've tried playing around with async generators but wasn't able to come up with anything better.

I've thought about using streams, which seem suited to this kinda task; they would solve the order problem (first in, first out) and allow some degree of parallelism. However they have the limitation that an item can't pass through an intermediate stage in the pipeline before an earlier item has, violating "b". I also don't know how easy it is to get streams to interface with promise-based APIs.

Does anyone know how this can be achieved?

Upvotes: 2

Views: 1510

Answers (2)

Bravo
Bravo

Reputation: 6264

I think this will do what you want ... a separate Promise "chain" for the "appendToFile" sequence

let writeSequence = Promise.resolve();

const delaySequence = itemArray.reduce(async (sequence, item) => {
  await sequence;
  const valuePromise = fetch(item.url)
    .then(step1)
    .then(step2)
    .then(step3);
  writeSequence = writeSequence.then(() => valuePromise.then(appendToFile));
  return sleep(1000);  // promisified setTimeout
}, Promise.resolve());

Sorry, had a stray invalid await in there - gone now

Upvotes: 1

Michał Karpacki
Michał Karpacki

Reputation: 2658

I had the same problem and I decided to create my own framework which would make such transforms simple. It's called scramjet and does exactly what you need.

Your code would look somewhat like this:

DataStream.from(itemArray).
    .setOptions({maxParallel: 8}) // so many executions will run in parallel
    .map(item => fetch(item.url)) // promises are automatically resolved
    .map(step1)                   // map works like on array
    .map(step2)
    .map(step3)
    .map(async x => (await sleep(1000), x))
    .stringify(someSerializer)    // I guess you would stringify your data here
    .pipe(fs.createWriteStream(path, {flags: "a+"}))
;

And this is your whole program.

Scramjet will take care of generating a chain of promises where possible, but expose that in a transform stream interface - so you can pipe it straight to a file somewhere or even stream directly to S3.

Hope that helps. :)

Upvotes: 1

Related Questions