Joshua Ohana
Joshua Ohana

Reputation: 6121

How to manage async callbacks synchronously, so all event are processed before moving on

I have a series of callbacks and events, and need to follow a synchronous flow, but cannot figure out how to organize this to accomplish as such.

A simplified version of the problem and code is as follows

I have a list of zips on a server that I download to local filesystem

const zipFiles = // very very long list of downloadedFiles
for (let i = 0; i < zipFiles.length; i++) {
  await processZipForExtraction(zipFiles[i]);
}

I want this await to really behave and wait for all below async processing to finish

private async processZipForExtraction (file: Image): Promise<any> {
  return new Promise(async (resolve, reject) => {
    let writeStream = fs.createWriteStream(file.name);
    readStream.pipe(writeStream);

    writeStream.on('finish', () => {

      // open zips
      var zip = new StreamZip({
        file: './' + file.name,
        storeEntries: true
      });

      zip.on('ready', () => {
        console.log('All entries read');
        // this only means the below event got fired for each file, and I get a count
      });

      zip.on('entry', (entry) => {
        if (entry.name.endsWith('') {

          zip.stream(entry.name, (err, stream) => {
            if (err) reject(err);

            let uploadStream = this.uploadFromStream(name);
            uploadStream.on('success', async () => {
              // this also means this entry in the zip is "done"
            });

            stream.pipe(uploadStream)
          });
        }
        else {
          // this also means this entry in the zip is "done"
        }
      });
    }
  });
}

Now the difficult here is that I want to wait until each entry in the zip is "done". Sometimes there's an async stream event (if it succeeded in the zip), and sometimes there isn't. How can I re-organize the logic so wait for

  1. Download each zip
  2. View each zip entry
  3. If it matches, process and upload stream
  4. Once all entry are both processed AND successfully uploaded 4a. delete the zip 4b. resolve the original processZipForExtraction (so that only one zip is being processed at a time)

I have several thousand and they're multi-gig so I want to only download one, process, delete, then start on the next one...

I'm sure I can re-organize the logic to be able to await everything but I can't figure out where or how, since the zip.on 'entry' event then itself has another stream event watcher inside in which I need to wait for, and I don't know how many files are in the zip until the 'ready' event fires

Thanks in advance!

Upvotes: 0

Views: 56

Answers (1)

Henry Mueller
Henry Mueller

Reputation: 1327

Following up on my comments, here is what I come up with using two approaches:

  1. Use the StreamZip entries method to get the entries into a list rather than listening to the entry event.
  2. Wrap each upload stream async activity in its own promise, then use Promise.all to await all uploads completing. (You could also keep track of the uploads manually in your own state, up to you.)

This code is obviously not tested and probably has some typos, hopefully it gives you some ideas, though.

private processZipForExtraction(file: Image): Promise<string> {
    return new Promise((resolve, reject) => {
        let writeStream = fs.createWriteStream(file.name);
        readStream.pipe(writeStream);

        writeStream.on('finish', () => {

            // open zips
            var zip = new StreamZip({
                file: './' + file.name,
                storeEntries: true
            });

            // To make keeping track of the status of asyc request easier, 
            // wrapping each upload in its own promise.  This is using the
            // existing abstraction of Promises in lieu of adding some sort
            // of state (probably a counter) to this class.  Either works,
            // choice is yours.
            const uploadEntry = (name) =>
                new Promise((resolveUpload, rejectUpload) => {
                    zip.stream(name, (err, stream) => {
                        if (err) reject(err); // note that this will reject the entire zip file processing, not just this upload

                        let uploadStream = this.uploadFromStream(name);
                        uploadStream.on('success', resolveUpload);
                        // uploadStream must have a fail event for this to work, but since `success` 
                        // is a non standard stream event, not wanting to assume name of failure event
                        uploadStream.on('THENAMEOFYOURFAILEVENT', rejectUpload);

                        stream.pipe(uploadStream)
                    });
                })

            zip.on('ready', async () => {
                console.log('All entries read');

                // This line based on reading the tests of `node-stream-zip`.
                // Apparently `.entries` returns an object with the prop keys
                // are the file names. Not (at all) tested. 
                const entryNameList = Object.keys(zip.entries());

                // Filter the entries you want to upload, then pass the entry
                // names to the `uploadEntry` to get the Promises wrapping each
                // async streaming operation.
                const uploadPromiseList = entryNameList
                    .filter(name => name.endsWith('')) // '' from original code, obv. you plan on putting something else here
                    .map(uploadEntry);

                // Wait for all the uploads to resolves.  Assuming you will
                // reject if any upload fails, but you could also 
                // Promise.allSettled if failed uploads don't matter.
                try {    
                    await Promise.all(uploadPromiseList);
                    resolve('success');
                } catch (err) {
                    reject(err);
                }
            });

        });
    });
}

You can add the code to delete the file after the Promise.all or outside this method after it resolves.

Upvotes: 1

Related Questions