Ram
Ram

Reputation: 432

How implement back pressure manually

i have child process in that i am piping stream to parent.

in child.js

  let stream = readdirp(pathname);
  stream.pipe.pipe(process.stdout);   

in parent.js

let file = child => {
  let estream = es.map((data, next) => {
    _this.s3MultiUpload(JSON.parse(data), data, next);
    //i uploding this files to s3.
  });
  child.on("end", (code, signal) => {
    console.log("stream ended"); // `here is my problem`
    child.kill();
  });
  child.on("exit", (code, signal) => {
    console.log(code);
    console.log(signal);
    child.kill();
  });
  return estream;
};
child = fork(filePath, { silent: true });
child.stdout.pipe(this.file(child));

My problem is before i upload all files to s3 stream got ended. i studied about backpressure, but i don't understand how to implement here ?

i think i need add callback or something to process stdout pipe. i don't know

Can you please help me

Upvotes: 1

Views: 728

Answers (1)

Aluan Haddad
Aluan Haddad

Reputation: 31803

The approach is unecessarily complicated. Since, IO operations aren't CPU bound we are better off using Promises together with JavaScript's async/await and * syntax to perform the parallel file uploading. Building our own synchronization mechanisms is complex and there are many overlapping language and library level concepts that arise1.

Based on the readdirp documentation, but noting my unfamiliarity with the specific upload API, I'd suggest something along these lines

const readdirp = require('readdirp');
const util = require('util');
const fs = require('fs');

const readfile = util.promisify(fs.readfile);

(async function () {
  // Use streams to achieve small RAM & CPU footprint.
  // 1) Streams example with for-await. Node.js 10+ only.
  const paths = [];
  for await (const {path} of readdirp('pending-uploads')) {
    paths.push(path);
  }

  const uploadPromises = paths
    .map(readFile)
    .map(JSON.parse).
    .map(data => s3MultiUpload(data));

  await Promise.all(uploadPromises);
}());

1.Back pressure is one of these concepts arising from the process of porting the Reactive Extensions library to the JVM implementing it in Java. Just for the sake of argument(sanity?) consider what Erik Meijer says regarding backpressure.

Upvotes: 1

Related Questions