Svetoslav Trifonov
Svetoslav Trifonov

Reputation: 1

Node js threadworker - threadId keep increasing its threadId number

I have a worker_thread, let's say a "Thread manager" in node 18 app that is spawning, let's call them "sub-workers". They do what they do and when finished execution, they send a message to "Thread manager" and the manager calls worker.terminate() I even recently add worker.unref(). After a sleep interval "Thread manager" spawns another "sub-worker" and the process keeps going until the app is stopped.

My here is my question... Is it normal when I create new "sub-worker" to get threadId n+1. For example: 2,3,4 even if 2 is finished already. I asked ChatGPT and I got more confused because it says that is not normal to keep growing...

Here is the code of the "Manager"

import {
  parentPort,
  workerData,
  isMainThread,
  Worker,
  threadId,
  WorkerOptions,
} from "worker_threads";
import { v4 as uuidv4 } from "uuid";
import type { TThreadCommand } from "../WorkerDispatcher";
import SyncerWorker from "../SyncerWorker";
import sleep from "../../../utils/sleep";
import * as jsonConfig from "./workers-config.json";

type TWorkerStatusResponse = {
  threadId: number;
  id: string;
  status: "success" | "error" | "info" | "warning" | "critical" | "stopped" | "started";
};

type TLoadWorker = {
  interval: number;
  name: string;
  class: string;
  path: string;
  "max-close-time": number;
  disabled: boolean;
  id: string;
};

type TWorkerTrackinginfo = {
  id: string;
  threadId: number;
  lastActive: number;
  interval: number;
  workerInstance: SyncerWorker | null;
  startAt: number;
  endAt: number;
  path: string;
  "max-close-time": number;
  disabled: boolean;
};

class ThreadManager<T> {
  public workers: SyncerWorker[] = [];
  public workerInfo: TWorkerTrackinginfo[] = [];
  public terminated: boolean = false;
  public isRunning: boolean = false;

  constructor(start: boolean = false, public sleepInterval = 1000) {
    if (isMainThread) {
      console.log("This is main thread");
    } else {
      console.log("This is worker thread");
    }

    if (workerData.configPath) {
      this.loadWorkers(workerData.configPath);
    } else {
      this.loadWorkers();
    }

    if (start) {
      // load and start workers via execute
      this.isRunning = true;
      this.execute();
    }
  }

  private addWorker(worker: TLoadWorker, workerId: string = "") {
    const newWorkerId = workerId || uuidv4();
    // test for ^. in path
    // worker.path.replace(/\.\//g, ""); in the beginning only
    if (worker.disabled) {
      return;
    }

    const workerPath = !workerId ? __dirname + worker.path.replace(/^\.+/, "") : worker.path;

    const workerOptions: WorkerOptions = {
      workerData: {
        interval: worker.interval,
        name: worker.name,
        class: worker.class,
        path: workerPath,
        "max-close-time": worker["max-close-time"],
        id: newWorkerId,
      },
    };

    const syncerWorker = new SyncerWorker(
      __dirname + "/wrap.js",
      workerOptions,
      false,
      worker.interval
    );
    syncerWorker.onDone = (data: TWorkerStatusResponse) => {
      // console.log("Worker done in worker-dispatcher-body.ts via Event");
      // console.log("Data", data);

      // update worker and clear instance of it
      this.updateWorker(data.id);
    };

    syncerWorker.onStop = (data: TWorkerStatusResponse) => {
      console.log("Worker stopped in worker-dispatcher-body.ts via Event");
      // console.log("Data", data);
      const worker = this.workers.find((worker) => worker.id === data.id);
      this.terminateWorker(worker);
      worker?.unref();
    };

    this.workers.push(syncerWorker);
    // push only if it is new if we have workerId it means caller is relaunching worker
    if (!workerId) {
      this.workerInfo.push({
        startAt: Date.now(),
        endAt: 0,
        id: newWorkerId,
        interval: worker.interval,
        lastActive: Date.now(),
        threadId: syncerWorker.threadId,
        workerInstance: syncerWorker,
        path: workerPath,
        "max-close-time": worker["max-close-time"],
        disabled: worker.disabled,
      });
    } else {
      const workerIndex = this.workerInfo.findIndex((w) => w.id === workerId);
      if (workerIndex !== -1) {
        // this.workerInfo[workerIndex].startAt = Date.now();
        this.workerInfo[workerIndex].endAt = 0;
        this.workerInfo[workerIndex].lastActive = Date.now();
        this.workerInfo[workerIndex].threadId = syncerWorker.threadId;
        this.workerInfo[workerIndex].workerInstance = syncerWorker;
      }
    }

    // console.log(this.workerInfo);
  }

  private updateWorker(workerId: string) {
    // find worker and update it
    const w = this.workers.find((worker) => {
      return worker.id === workerId;
    });

    if (w) {
      // remove from workers and find and update internal in tracking info
      this.removeWorker(w);
      // update worker
      const workerIndex = this.workerInfo.findIndex((w) => w.id === workerId);

      if (workerIndex !== -1) {
        this.workerInfo[workerIndex].endAt = Date.now();
        this.workerInfo[workerIndex].workerInstance = null;
      }
    }
  }

  relaunchWorker(id: string) {
    const worker = this.workerInfo.find((w) => w.id === id);
    if (worker) {
      const workerInfo: TLoadWorker = {
        interval: worker.interval,
        name: "Relaunched " + worker.id,
        class: "Relaunched " + worker.id,
        path: worker.path,
        "max-close-time": worker["max-close-time"],
        id: worker.id,
        disabled: worker.disabled,
      };

      this.addWorker(workerInfo, worker.id);
    }
  }

  private removeWorker(worker: SyncerWorker) {
    worker.terminate();
    worker.unref();
    this.workers = this.workers.filter((w) => w.id !== worker.id);
  }

  public terminateAll(immediate: boolean = false) {
    this.workers.forEach((worker) => {
      worker.sendMessage({
        message: "terminate",
        command: immediate ? "immediate-stop" : "stop",
        data: null,
      });
    });
  }

  public terminateWorker(worker: SyncerWorker | undefined) {
    if (!worker) return;

    worker.terminate();
    this.workerInfo = this.workerInfo.filter((w) => w.id !== worker.id);
    worker.unref();

    if (this.workerInfo.length === 0) {
      this.terminated = true;
      parentPort?.postMessage({
        command: "done",
        status: "stopped",
      } as TThreadCommand);
    }
  }

  public terminateWorkerById(id: string) {
    const worker = this.workers.find((w) => w.id === id);
    this.terminateWorker(worker);
  }

  public terminateWorkerByThreadId(threadId: number) {
    const worker = this.workers.find((w) => w.threadId === threadId);
    this.terminateWorker(worker);
  }

  async loadWorkers(configPath: string = "./workers-config.json") {
    // load workers from config
    // import json dynamic here
    const config = await import(configPath);
    const workers: TLoadWorker[] = config.default.workers;
    for (const worker of workers) {
      this.addWorker(worker, worker.id);
    }
  }

  report() {
    console.log("Ping");
    parentPort?.postMessage({
      command: "ping",
    } as TThreadCommand);
  }

  ping(id?: string) {
    if (id) {
      const worker = this.workers.find((w) => w.id === id);
      if (worker) {
        worker.sendMessage({ command: "ping" });
      }
    } else {
      this.workers.forEach((worker) => {
        worker.sendMessage({ command: "ping" });
      });
    }
  }

  public async execute() {
    while (!this.terminated) {
      await sleep(this.sleepInterval);

      // check which worker is ready to be started again based on stop time and interval of the worker from workerInfo
      const now = Date.now();
      // console.log(this.workerInfo[0]);
      const workersToStart = this.workerInfo.filter((w) => {
        // console.log(
        //   w.endAt,
        //   w.interval,
        //   w.endAt - w.interval * 1000,
        //   now - w.endAt,
        //   w.interval * 1000
        // );

        if (w.workerInstance && w.workerInstance.isRunning) return false;

        if (w.endAt === 0) return false;

        return now - w.endAt > w.interval * 1000;
      });

      // add workers to start
      workersToStart.forEach((worker) => {
        this.relaunchWorker(worker.id);
      });
    }
  }
}

const threadManager = new ThreadManager<TThreadCommand>(
  workerData?.start ?? true,
  workerData?.interval ?? 1000
);

if (parentPort) {
  parentPort.on("message", (p: TThreadCommand<TWorkerStatusResponse>) => {
    // console.log(`Command received in dispatcher body`, p);
    if (p.command === "start") {
      console.log("Start command received in worker dispatcher body");
      !threadManager.isRunning && threadManager.execute();
    }

    if (p.command === "stop") {
      threadManager.terminated = true;
      threadManager.isRunning = false;
      // send command to all workers to stop with wait for completion
      // threadManager.workers.forEach((worker) => {
      //   worker.sendMessage({ message: "terminate", command: "stop", data: null });
      // });
      threadManager.terminateAll();
    }

    if (p.command === "immediate-stop") {
      threadManager.terminated = true;
      threadManager.isRunning = false;
      threadManager.terminateAll(true);
    }

    // // child report done with status stopped it became event
    // if (p.command === "done" && p.status === "stopped") {
    //   console.log("Child worker done with status stopped");
    //   const worker = threadManager.workers.find((worker) => worker.id === p.data.id);
    //   threadManager.terminateWorker(worker);
    // }
  });
}

Here is my terminate() function from the extended class

async terminate(): Promise<number> {
    // console.log("Dispatcher is terminating...");
    this._terminated = true;
    this._isRunning = false;
    // send message to worker to stop (all threads)
    this._sendMessage({ message: "terminate", command: "stop", data: null });
    // wait for all threads to stop or maxCloseTime in seconds
    const startClosing = Date.now();
    while (!this.canClose && Date.now() - startClosing < this.maxCloseTime * 1000) {
      // console.log("Dispatcher trying to close");
      await sleep(1000);
    }
    console.log("Dispatcher is terminated");
    return super.terminate();
  }

I think I maybe have memory leak. The reason I am thinking that is because it runs for several hours in docker container for example it reaches threadId = 250 or sometimes threadId = 140

I tried to call worker.unerf() after terminate because I read that if it is already terminated it will not have any effect, but still threadId number is growing

Thanks, any help appreciated

Upvotes: 0

Views: 50

Answers (0)

Related Questions