Daksh Gargas
Daksh Gargas

Reputation: 3943

Run NestJS worker in a separate process

I am implementing NestJS worker, queues, using Bull.

According to the documentation, both the worker and the server (will) run in a same "process", but I want to run the worker in a separate process, so as to not block the main event loop.

I think it's called "running a task in a separate binary" or something else.

Anyway, I tried googling it, went through the documentation of NestJS, but couldn't find something similar.

++ In other words:

I have a main project (my current), and I want to create the worker in a separate process (standalone application) and want to connect both my current main project and worker. And I can't really find it in the documentation.

In which module should I instantiate my Bull's instance? I am assuming I'll keep my producer in my main module and consumer in my worker module.

How can I do so?

Please note, by "separate process", I do not mean running a specific task in a separate process, as defined in Bull's documentation. I want to deploy the whole worker module in a separate process or whatever the term should be used.

++ [Extra, if possible]

Before running my server and worker, I also want to check whether my worker (bull instance) is successfully connected to my Redis server. I couldn't find anything on the Bull's documentation... do you think there is a good workaround for that?

Upvotes: 29

Views: 34377

Answers (3)

opinfosec
opinfosec

Reputation: 11

Since I don't have any reputation I wanted to reply about running the CLI worker without exposing endpoints, you can certainly do that.

I use pm2 to run multiple workers.

Notice in the worker.ts I use createApplicationContext and don't run app.listen(...) https://docs.nestjs.com/standalone-applications

worker.ts

import { NestFactory } from '@nestjs/core'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { WorkersModule } from './services/queue/workers-email/workers.module'

async function bootstrap() {
    const app = await NestFactory.createApplicationContext(WorkersModule)
    const config = app.get(ConfigService)
    ConfigModule.forRoot({ isGlobal: true })

    app.useLogger(
        config.get<string>('NODE_ENV') === 'development'
            ? ['log', 'debug', 'error', 'verbose', 'warn']
            : ['log', 'error', 'warn'],
    )
}

bootstrap()

worker.module.ts

import { BullModule } from '@nestjs/bullmq'
import { Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { redisFactory } from '../../../factories/redis.factory'
import { workerProcessor } from './workers.processor'
   
@Module({
    imports: [
        BullModule.forRootAsync({
            imports: [ConfigModule],
            useFactory: redisFactory,
            inject: [ConfigService],
        }),
        BullModule.registerQueueAsync({ name: 'queue-name' }),
    
    ],
    providers: [
        ConfigService,
        workerProcessor,
    ],
})
export class WorkersModule {}

workers.processor.ts

import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'
import { Logger } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import { Job } from 'bullmq'

@Processor('queue-name')
export class workerProcessor extends WorkerHost {
    private logger = new Logger('processor')

    constructor(private config: ConfigService) {
        super()
    }

    async process(job: Job<any, any, string>): Promise<any> {
        ... process here ...
    }

    @OnWorkerEvent('completed')
    onCompleted(job: Job<anu, any, string>) {
        this.logger.log(`Job ${job.id} ${job.name.toUpperCase()} Completed`)
    }

    @OnWorkerEvent('failed')
    onFailed(job: Job<any, any, string>) {
        this.logger.error(`Job ${job.id} ${job.name.toUpperCase()} Failed`)
    }


}

Upvotes: 1

Daksh Gargas
Daksh Gargas

Reputation: 3943

Goal: Horizontal Scaling ✅

While the answer provided by Isolated should work, I didn't want to run a whole new project and import my modules as they suggested. So after some more R&D, I've figured out a different way to do so.

Just as we have our index.ts or main.ts file for our "parent" project, in the same dir (doesn't have to be), create a worker.ts and worker.module.ts.

In the worker.module.ts, make sure you register your Bull module again [BullModule.forRoot({})] and include all the imports that you'd need for your consumer.

In the providers, you should add our consumers and you're good to go.

The worker.ts would look like this (nothing fancy):

import { NestFactory } from '@nestjs/core';
import { NestExpressApplication } from '@nestjs/platform-express';
import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston';
import { WorkerModule } from './worker/worker.module';

async function bootstrap() {
  const app = await NestFactory.create<NestExpressApplication>(WorkerModule);
  app.useLogger(app.get(WINSTON_MODULE_NEST_PROVIDER));
  process.env.WORKER_HTTP_PORT = process.env.WORKER_HTTP_PORT ?? '4001';
  await app.listen(process.env.WORKER_HTTP_PORT);
  console.debug(`Worker is running on ${await app.getUrl()}`);
}
bootstrap();

Your nest-cli.json should like something like this

{
  "collection": "@nestjs/schematics",
  "sourceRoot": "src",
  "entryFile": "main",
  "compilerOptions": {
    "assets": ["**/*.graphql"],
    "watchAssets": true
  }
}

and create a new nest-cli-worker.json

{
    "collection": "@nestjs/schematics",
    "sourceRoot": "src",
    "entryFile": "worker",
    "compilerOptions": {
        "watchAssets": true
    }
}

Now, the question is, how to run it?

I am using yarn commands to start my server (defining them in package.json)

To start my server, I'd

"start:dev": "yarn nest start --watch -e 'node -r dotenv/config -r source-map-support/register'"

or

"start:prod": "node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/main.js"

and to start my worker, I'd run the following command(s) in another (terminal) shell...

Dev

"worker:start:dev": "yarn nest start --config nest-cli-worker.json --watch -e 'node -r dotenv/config -r source-map-support/register'"

or

Prod

"worker:start:prod": "node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/worker.js"

P.S You don't necessarily have to add dotenv/config.

Bonus:

If you want to run your server(s) in docker

Here's my docker-compose.yaml file

version: '3.8'

services:
  main:
    container_name: my-server
    image: xxx.amazonaws.com/xx/xxx:${CONTAINER_IMAGE_TAG:-latest}
    ports:
      - 80:80
    command: node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/main.js #My `prod` command for main server
    volumes:
      - xxx
    links:
      - xxx
    environment:
      xxx
    # .env is generated by Elastic Beanstalk, don't provide one
    env_file:
      - .env
  worker:
    container_name: worker-server #YOUR WORER
    image: xxx.us-west-2.amazonaws.com/xxx:${CONTAINER_IMAGE_TAG:-latest}
    ports:
      - 90:90
    links:
      - xxx
    command: node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/worker.js  #prod command for Worker
    volumes:
      - xxx
    environment:
      xxx
    # .env is generated by Elastic Beanstalk, don't provide one
    env_file:
      - .env

Upvotes: 27

Isolated
Isolated

Reputation: 2453

You can use that documentation to implement the entire worker. If you use Nest.js in standalone mode you can just have Processor(s) and Process(es).

This is documented here. “Separate binary” isn’t a question either. A binary is the product of compilation, Node.js isn’t compiled so you’ll need a separate application.

You don’t need a workaround for anything, this is literally the nature of Bull and optionally Nest.js.

Sometimes you’ll need to adapt examples in docs to fit your needs, this can take some time to learn.

Terminology

I think there's some confusion with terminology so in this post assume that:

  1. A process is what your application runs inside (if you look in your OS process manager it should be node).
  2. A application is one Node.js project that runs in a separate process.
  3. A worker is an application that is only focused with processing Queue jobs.
  4. Queue and Job is terminology of Bull.
  5. Processor and Process is terminology of Nest.js @nestjs/bull

Solution

Here is how you create an application with a worker running in separate processes. After following these instructions, you should see two processes running your process manager.

Create a new Nest.js application that we'll use for your worker:

nest new my-worker

Open src/main.ts and replace everything in bootstrap function with:

const app = await NestFactory.createApplicationContext(AppModule);

Install Bull and the Nest.js implementation with:

yarn add @nestjs/bull bull

Open src/app.module.ts and remove AppController from controllers, and add BullModule.registerQueue to imports (from @nestjs/bull.

Your src/app.module.ts should now look like:

// app.module.ts
// ... imports
@Module({
  imports: [
    BullModule.registerQueue({
      name: 'my-queue',
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}

Create a new file: app.processor.ts in src directory:

// app.processor.ts
// ... imports
@Processor('my-queue')
export class AppConsumer {
    @Process('namedjob')
    async processNamedJob(job: Job<any>): Promise<any> {
        // do something with job and job.data
    }
}

And you're done for the worker side of things. Now all you need to do is in your application (main project), update your AppModule to include BullModule.registerQueue (like above) and inject it:

export class MyService {
  constructor(@InjectQueue('my-queue') private queue: Queue) {}
}

And then use this.queue.add('namedJob', data);

Try above and if you get stuck, create a repository on Github and I'll get you on the right track.

Reference

  1. https://github.com/OptimalBits/bull#separate-processes
  2. https://docs.nestjs.com/standalone-applications

Upvotes: 32

Related Questions