Koby 27
Koby 27

Reputation: 1120

Wait for async function and promises in it to finish

My task: I have a file that contains many items and each item is related to an array of URLs of images which I need to download. I want to download all of the links, I'm using this library for the image downloading and I'm using promises.

The problem: The problem occurs when I start to download many images from many items, the program sends more than 4000 requests before the first one finished and the program crashes.

My solution: My idea was to only handle about 2 items at a time so that I'm downloading about 20 images at a time. I've tried all sorts of variations with promises and async functions but I'm pretty new to those so my attempts failed.

My code flow is something like this:

csvRun()

function csvRun(){
    for(let i = 1; i <= itemsAmount; i++){  // Loops over the items
        // I want to be able to run only x items at a time
        console.log('Item number ' + i)
        itemHandle() 
    }
}

function itemHandle(){ // This function seems useless here but since the item has more data I kept it here
    handleImages()
}


function handleImages(){  // Loops over the images of the item
    for(let g = 0; g < imagesAmount; g++){        
        // Here there is a promise that downloads images
        // For the example I'll use settimout
        setTimeout(() => {
            console.log('Image downloaded ' + g)
        }, 3000);

        /** If you want the error just use ImgDonwload instead of
            settimeout and set imagesAmount to 20 and itemsAmount 
            to 400
        */ 

    }

}

// Only here to recreate the error. Not necessarily relevant.
function ImgDownload(){
    var download = require('image-downloader')
    download // returns the promise so the handling could resume in order
    .image({
        url:
            "https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg",
        dest: "/folder/img.jpg"
    })
    .then(({ filename, image }) => {
        console.log("File saved to", filename);
    })
    .catch((err: Error) => {
        console.error(err);
    });
}

Currently, the code finishes the loop in csvRun and prints out Item number 1 up to Item number {itemsAmount} and after 3 seconds prints out all of the Image downloaded messages. I understand why that happens. I want to change the code so that each time only 2 calls to itemHandle are being made simultaneously.

Upvotes: 2

Views: 383

Answers (4)

Nick Tomlin
Nick Tomlin

Reputation: 29221

I think I still prefer Jonas's implementation for being concise, but i'll add another to the ring. A few features:

  1. Results and errors are available in a stable array (based on position).
  2. This starts processing another item as soon as the individual worker function has finished, instead of batching things and waiting for each Promise.all to resolve.
function parallelMap(values, workFn, maxConcurrency = 2) {
  const length = values.length;
  const results = Array.from({ length });

  let pos = 0;
  let completed = 0;

  return new Promise(resolve => {
    function work() {
      if (completed === length) {
        return resolve(results);
      }

      if (pos >= length) {
        return;
      }

      const workIndex = pos;
      const item = values[workIndex];
      pos = pos + 1;

      return workFn(item, workIndex)
        .then(result => {
          results[workIndex] = result;
          completed = completed + 1;
          work();
        })
        .catch(result => {
          results[workIndex] = result;
          completed = completed + 1;
          work();
        });
    }

    for (let i = 0; i < maxConcurrency; i++) {
      work();
    }
  });
}

Usage:

async function fakeRequest({ value, time = 100, shouldFail = false }) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (shouldFail) {
        reject("Failure: " + value);
      } else {
        resolve("Success: " + value);
      }
    }, time);
  });
}

test("basic 'working' prototype", async () => {
  const values = [1, 2, 3, 4, 5, 6];
  const results = await parallelMap(values, value => {
    return fakeRequest({ value, time: 100, shouldFail: value % 2 === 0 });
  });

  expect(results).toEqual([
    "Success: 1",
    "Failure: 2",
    "Success: 3",
    "Failure: 4",
    "Success: 5",
    "Failure: 6"
  ]);
}, 350); // each task takes ~100ms to complete, 6 tasks, two at a time = ~300 ms

See the codesandbox for a full test suite.

Upvotes: 0

Shankar Regmi
Shankar Regmi

Reputation: 874

Let's suppose your data looks like this

const items = [
  { id: 1,
    images: [
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
     ]
  },
  { id: 2,
    images: [
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
     ]
  },
  { id: 3,
    images: [
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
      'https://cdn.vox-cdn.com/thumbor/XKPu8Ylce2Cq6yi_pgyLyw80vb4=/0x0:1920x1080/1200x800/filters:focal(807x387:1113x693)/cdn.vox-cdn.com/uploads/chorus_image/image/63380914/PIA16695_large.0.jpg',
     ]
  }
];

I would run a simple for..of loop and iterate over images and download item by item

// this function will try to download images per items
const download = require('image-downloader')
const downloadImages = async (items = []) => {
  let promises = [];
  for (const item of items) {
    const images = item.images;
    // dest is item.id/imageIndex.jpg
    promsies = images.map((url, index) => download({url, dest: `/folder/${item.id}/${index}.jpg`}));
    await Promise.all(promises);
  }
}

downloadImages(items);

Upvotes: 0

Robert Lombardo
Robert Lombardo

Reputation: 152

with vanilla promises you might do something like:

let pending_fetches = 0
const MAX_CONCURRENT = 2

const fetch_interval = setInterval(() => {
    if (items.length === 0) return clearInterval(fetch_interval)

    if (pending_fetches < MAX_CONCURRENT) {
        ++pending_fetches
        doFetch(items.pop()).then(response => {
            // do stuff with the response
            --pending_fetches
        })
    }    
}, 100)

with async/await something like:

const MAX_CONCURRENT = 2

const fetchLoop = async () => {
    while (items.length > 0) {
        const response = await doFetch(items.pop())
        // do stuff with the response
    }
}
for (let i = 0; i < MAX_CONCURRENT; ++i) fetchLoop()

Upvotes: 0

Jonas Wilms
Jonas Wilms

Reputation: 138267

One option would be to have a loop that goes over the images and processes one after another. To then run multiple processings in parallel, start multiple loops:

  // Goes over the "data" array, calls and waits for each "task" and processes "runnerCount" tasks in parallel
  function inParallel(task, data, runnerCount) {
    let i = 0, results = [];

    async function runner() {
      while(i < data.length) {
         const pos = i++; // be aware: concurrent modification of i
         const entry = data[pos]; 
         results[pos] = await task(entry);
      }
   }

    const runners = Array.from({ length: runnerCount }, runner);

    return Promise.all(runners).then(() => results);
 }

To be used as:

  const delay = ms => new Promise(res => setTimeout(res, ms));

 inParallel(async time => {
   console.log(`Timer for ${time}ms starts`);
   await delay(time);
   console.log(`Timer for ${time}ms ends`);
 }, [5000, 6000, 1000]/*ms*/, 2/*in parallel*/);

Upvotes: 4

Related Questions