Ishtar Clover
Ishtar Clover

Reputation: 43

How to make a certain number of functions run parallel in loop in NodeJs?

I'm looking for a way to run 3 same-functions at once in a loop and wait until it finish and continues to run another 3 same-functions. I think it involves a loop, promise API. But my solution is fail. It would be great if you could tell me what did I do wrong and how to fix it.

Here is what I have done so far:


I have a download function (call downloadFile), an on-hold function (call runAfter) and a multi download function (call downloadList). They look like this:

const https = require('https')
const fs = require('fs')
const { join } = require('path')
const chalk = require('chalk') // NPM
const mime = require('./MIME') // A small module read Json and turn it to object. It returns a file extension string.

exports.downloadFile = url => new Promise((resolve, reject) => {
    const req = https.request(url, res => {
        console.log('Accessing:', chalk.greenBright(url))
        console.log(res.statusCode, res.statusMessage)
        // console.log(res.headers)

        const ext = mime(res)
        const name = url
            .replace(/\?.+/i, '')
            .match(/[\ \w\.-]+$/i)[0]
            .substring(0, 250)
            .replace(`.${ext}`, '')
        const file = `${name}.${ext}`
        const stream = fs.createWriteStream(join('_DLs', file))

        res.pipe(stream)
        res.on('error', reject)

        stream
            .on('open', () => console.log(
                chalk.bold.cyan('Download:'),
                file
            ))
            .on('error', reject)
            .on('close', () => {
                console.log(chalk.bold.cyan('Completed:'), file)
                resolve(true)
            })
    })
    req.on('error', reject)
    req.end()
})

exports.runAfter = (ms, url) => new Promise((resolve, reject) => {
    setTimeout(() => {
        this.downloadFile(url)
            .then(resolve)
            .catch(reject)
    }, ms);
})

/* The list param is Array<String> only */
exports.downloadList = async (list, options) => {
    const opt = Object.assign({
        thread: 3,
        delayRange: {
            min: 100,
            max: 1000
        }
    }, options)

    // PROBLEM
    const multiThread = async (pos, run) => {
        const threads = []
        for (let t = pos; t < opt.thread + t; t++) threads.push(run(t))
        return await Promise.all(threads)
    }

    const inQueue = async run => {
        for (let i = 0; i < list.length; i += opt.thread)
            if (opt.thread > 1) await multiThread(i, run)
            else await run(i)
    }

    const delay = range => Math.floor(
        Math.random() * (new Date()).getHours() *
        (range.max - range.min) + range.min
    )

    inQueue(i => this.runAfter(delay(opt.delayRange), list[i]))
}

The downloadFile will download anything from the link given. The runAfter will delay a random ms before excute downloadFile. The downloadList receive a list of URL and pass each of it to runAfter to download. And that (downloadList) is where the trouble begin.

If I just pass the whole list through simple loop and execute a single file at once. It's easy. But if I pass a large requests, like a list with 50 urls. It would take long time. So I decide to make it run parallel at 3 - 5 downloadFile at once, instead of one downloadFile. I was thinking about using async/await and Promise.all to solve the problem. However, it's crash. Below is the NodeJs report:

<--- Last few GCs --->

[4124:01EF5068]    75085 ms: Scavenge 491.0 (493.7) -> 490.9 (492.5) MB, 39.9 / 0.0 ms  (average mu = 0.083, current mu = 0.028) allocation failure
[4124:01EF5068]    75183 ms: Scavenge 491.4 (492.5) -> 491.2 (493.2) MB, 29.8 / 0.0 ms  (average mu = 0.083, current mu = 0.028) allocation failure


<--- JS stacktrace --->

==== JS stack trace =========================================

    0: ExitFrame [pc: 00B879E7]
Security context: 0x03b40451 <JSObject>
    1: multiThread [04151355] [<project folder>\inc\Downloader.js:~62] [pc=03C87FBF](this=0x03cfffe1 <JSGlobal Object>,0,0x041512d9 <JSFunction (sfi = 03E2E865)>)
    2: inQueue [041513AD] [<project folder>\inc\Downloader.js:70] [bytecode=03E2EA95 offset=62](this=0x03cfffe1 <JSGlobal Object>,0x041512d9 ...

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

Writing Node.js report to file: report.20200428.000236.4124.0.001.json
Node.js report completed

Apparently, a sub-function of downloadList (multiThread) is a cause but I couldn't read those number (seems like a physical address of RAM or something), so I have no idea how to fix it. I'm not a professional engineer so I would appreciate if you could give me a good explanation.

Addition information:

In case you might ask:

Upvotes: 1

Views: 640

Answers (1)

Always Learning
Always Learning

Reputation: 5601

Your for-loop in multiThread never ends because your continuation condition is t < opt.thread + t. This will always be true if opt.thread is not zero. You will have an infinite loop here, and that's the cause of your crash.

I suspect you wanted to do something like this:

const multiThread = async (pos, run) => {
  const threads = [];
  for (let t = 0; t < opt.thread && pos+t < list.length; t++)  {
    threads.push(run(pos + t));
  }
  return await Promise.all(threads);
};

The difference here is that the continuation condition for the loop should be limiting itself to a maximum of opt.thread times, and also not going past the end of the number of entries in the list array.

If the list variable isn't global (ie, list.length is not available in the multiThread function), then you can leave out the second part of the condition and just handle it in the run function like this so that any values of i past the end of the list are ignored:

inQueue(i => {
  if (i < list.length) this.runAfter(delay(opt.delayRange), list[i])
})

Upvotes: 1

Related Questions