Jake Lowen
Jake Lowen

Reputation: 929

awaiting completion of a highland stream

I am writing a small script to stream download and process a number of sequentially named files from url. I am using highlandjs and have it working perfectly one by one. I am attempting to refactor into a loop wherein I await the completion of one highland stream prior to starting another:

// my attempt to abstract the highland into a promise I can await  
const processFile = async (url, path) => {
      const writeStream = fs.createWriteStream(path);

      return hl(request(url))
        .split()
         // various working transforms
        .map(splitDelim)
        .filter(filterSchedAOnly)
        .map(appendIdentity)
        .filter(filterOnlyIdentified)
        .map(convertToCSVsafeString)
        // end result should write to a file
        .pipe(writeStream)
        .toPromise();

   // also tried this
  // return new Promise((resolve, reject) => {
  //   writeStream.on("end", () => resolve());
  //   writeStream.on("error", () => reject());
  // });
};

(async () => {
  let i = 1;
  // infinite loop
  while (true) {
    const url = `http://.../${i}.txt`;
    const filePath = `${i}.processed.csv`;
    try {
      // does not work!
      await processFile(url, filePath);
    } catch (err) {
      console.log(err);
    }
    i++;
  }
})();

How should I wrap my processFile func such that I can await it's completion before moving on to the next iteration?

Upvotes: 1

Views: 523

Answers (1)

Jake Lowen
Jake Lowen

Reputation: 929

This appears to be working:

function streamToPromise(stream) {
  return new Promise(function(resolve, reject) {
    // resolve with location of saved file
    stream.on("finish", () => resolve());
    // stream.on("end", () => resolve());
    stream.on("error", () => reject());
  });
}

const processFile = async (url, path, i) => {

  const stream = hl(request(url))
    .split()
    // ......
    .pipe(fs.createWriteStream(path));

  return streamToPromise(stream);

};

Upvotes: 0

Related Questions