Glechik
Glechik

Reputation: 135

Processing a multidimensional array with worker_threads

I have an interesting task, but I was even confused in learning workers. There is an array of dimensions of 10-30K objects. I want to break it into subarrays in the number of available streams and in each subarray to implement the function of searching for the desired object in certain fields.

With the question of partitioning the array into subarrays and the implementation of the search function - all is well. But how to start a search with the help of workers simultaneously in each subarray - there are troubles (

I just began to get acquainted with the workers and did not fully understand everything. I would be grateful for the help or advice.

P.S. Executing the code I get an error

function evalInWorker(f){
    if (isMainThread){
        return new Promise((res, rej) =>{
            const worker = new Worker(__filename, {eval: true});
            worker.on('error', e => rej(e));
            worker.on('message', msg => {
                res(msg);
            });
            worker.on('exit', code => {
                if(code !== 0)
                    rej(new Error(`Worker stopped with exit code ${code}`));
            });
        });
    }else {
        parentPort.postMessage(f());
    }
}
//getSlicedArr возвращает массив с подмассивами, search - ищет в подмассиве объект по нужным свойствам needToFind
const tasks = (threads, dataArr, needToFind, arr = \[\]) => {
    getSlicedArr(dataArr, threads).map( e => arr.push(evalInWorker(search(e, needToFind))));
    return arr;
};

Promise.all(tasks(subArrSize, dataArr, needToFind))
    .then(messList => {
        messList.forEach(m => console.log(m))
    })
    .catch(e => console.log(e));

enter image description here

Upvotes: 2

Views: 934

Answers (1)

Wilk
Wilk

Reputation: 8113

For this kind of purpose, you can take a look to microjob lib that has been built exactly for this kind of stuff.

Here's an example with your context (it uses Typescript but it is the same with JS:

import { start, stop, job } from 'microjob';

const main = async () => {
  await start();

  // build an array of 5 arrays filled with random numbers
  const table = Array.from({ length: 5 }).map(_ => Array.from({ length: 100 }).map(Math.random));

  // this is your search function (just a placeholder here)
  const search = (arr: number[]): number => {
    console.log(arr);

    return arr[0];
  };

  // job executes the search function inside a new thread
  // so the executions are made in parallel
  const res = await Promise.all(table.map(data => job(search, { data })));
  console.log(res);

  await stop();
};

main();

Upvotes: 1

Related Questions