Reputation: 3943
I am implementing NestJS worker, queues, using Bull.
According to the documentation, both the worker and the server (will) run in a same "process", but I want to run the worker in a separate process, so as to not block the main event loop.
I think it's called "running a task in a separate binary" or something else.
Anyway, I tried googling it, went through the documentation of NestJS, but couldn't find something similar.
++ In other words:
I have a main project (my current), and I want to create the worker in a separate process (standalone application) and want to connect both my current main project and worker. And I can't really find it in the documentation.
In which module should I instantiate my Bull's instance? I am assuming I'll keep my producer
in my main module and consumer
in my worker module.
How can I do so?
Please note, by "separate process", I do not mean running a specific task in a separate process, as defined in Bull's documentation. I want to deploy the whole worker module in a separate process or whatever the term should be used.
++ [Extra, if possible]
Before running my server and worker, I also want to check whether my worker (bull instance) is successfully connected to my Redis server. I couldn't find anything on the Bull's documentation... do you think there is a good workaround for that?
Upvotes: 29
Views: 34377
Reputation: 11
Since I don't have any reputation I wanted to reply about running the CLI worker without exposing endpoints, you can certainly do that.
I use pm2 to run multiple workers.
Notice in the worker.ts I use createApplicationContext and don't run app.listen(...) https://docs.nestjs.com/standalone-applications
worker.ts
import { NestFactory } from '@nestjs/core'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { WorkersModule } from './services/queue/workers-email/workers.module'
async function bootstrap() {
const app = await NestFactory.createApplicationContext(WorkersModule)
const config = app.get(ConfigService)
ConfigModule.forRoot({ isGlobal: true })
app.useLogger(
config.get<string>('NODE_ENV') === 'development'
? ['log', 'debug', 'error', 'verbose', 'warn']
: ['log', 'error', 'warn'],
)
}
bootstrap()
worker.module.ts
import { BullModule } from '@nestjs/bullmq'
import { Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { redisFactory } from '../../../factories/redis.factory'
import { workerProcessor } from './workers.processor'
@Module({
imports: [
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: redisFactory,
inject: [ConfigService],
}),
BullModule.registerQueueAsync({ name: 'queue-name' }),
],
providers: [
ConfigService,
workerProcessor,
],
})
export class WorkersModule {}
workers.processor.ts
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'
import { Logger } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import { Job } from 'bullmq'
@Processor('queue-name')
export class workerProcessor extends WorkerHost {
private logger = new Logger('processor')
constructor(private config: ConfigService) {
super()
}
async process(job: Job<any, any, string>): Promise<any> {
... process here ...
}
@OnWorkerEvent('completed')
onCompleted(job: Job<anu, any, string>) {
this.logger.log(`Job ${job.id} ${job.name.toUpperCase()} Completed`)
}
@OnWorkerEvent('failed')
onFailed(job: Job<any, any, string>) {
this.logger.error(`Job ${job.id} ${job.name.toUpperCase()} Failed`)
}
}
Upvotes: 1
Reputation: 3943
Goal: Horizontal Scaling ✅
While the answer provided by Isolated should work, I didn't want to run a whole new project and import my modules as they suggested. So after some more R&D, I've figured out a different way to do so.
Just as we have our index.ts
or main.ts
file for our "parent" project, in the same dir (doesn't have to be), create a worker.ts
and worker.module.ts
.
In the worker.module.ts
, make sure you register your Bull module again [BullModule.forRoot({})
] and include all the imports that you'd need for your consumer.
In the providers
, you should add our consumers
and you're good to go.
The worker.ts
would look like this (nothing fancy):
import { NestFactory } from '@nestjs/core';
import { NestExpressApplication } from '@nestjs/platform-express';
import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston';
import { WorkerModule } from './worker/worker.module';
async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(WorkerModule);
app.useLogger(app.get(WINSTON_MODULE_NEST_PROVIDER));
process.env.WORKER_HTTP_PORT = process.env.WORKER_HTTP_PORT ?? '4001';
await app.listen(process.env.WORKER_HTTP_PORT);
console.debug(`Worker is running on ${await app.getUrl()}`);
}
bootstrap();
Your nest-cli.json
should like something like this
{
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"entryFile": "main",
"compilerOptions": {
"assets": ["**/*.graphql"],
"watchAssets": true
}
}
and create a new nest-cli-worker.json
{
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"entryFile": "worker",
"compilerOptions": {
"watchAssets": true
}
}
Now, the question is, how to run it?
I am using yarn
commands to start my server (defining them in package.json
)
To start
my server
, I'd
"start:dev": "yarn nest start --watch -e 'node -r dotenv/config -r source-map-support/register'"
or
"start:prod": "node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/main.js"
and to start
my worker
, I'd run the following command(s) in another (terminal) shell...
Dev
"worker:start:dev": "yarn nest start --config nest-cli-worker.json --watch -e 'node -r dotenv/config -r source-map-support/register'"
or
Prod
"worker:start:prod": "node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/worker.js"
P.S You don't necessarily have to add dotenv/config
.
Bonus:
If you want to run your server(s) in
docker
Here's my docker-compose.yaml
file
version: '3.8'
services:
main:
container_name: my-server
image: xxx.amazonaws.com/xx/xxx:${CONTAINER_IMAGE_TAG:-latest}
ports:
- 80:80
command: node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/main.js #My `prod` command for main server
volumes:
- xxx
links:
- xxx
environment:
xxx
# .env is generated by Elastic Beanstalk, don't provide one
env_file:
- .env
worker:
container_name: worker-server #YOUR WORER
image: xxx.us-west-2.amazonaws.com/xxx:${CONTAINER_IMAGE_TAG:-latest}
ports:
- 90:90
links:
- xxx
command: node -r dotenv/config -r ./tsconfig-paths-bootstrap.js dist/src/worker.js #prod command for Worker
volumes:
- xxx
environment:
xxx
# .env is generated by Elastic Beanstalk, don't provide one
env_file:
- .env
Upvotes: 27
Reputation: 2453
You can use that documentation to implement the entire worker. If you use Nest.js in standalone mode you can just have Processor(s) and Process(es).
This is documented here. “Separate binary” isn’t a question either. A binary is the product of compilation, Node.js isn’t compiled so you’ll need a separate application.
You don’t need a workaround for anything, this is literally the nature of Bull and optionally Nest.js.
Sometimes you’ll need to adapt examples in docs to fit your needs, this can take some time to learn.
I think there's some confusion with terminology so in this post assume that:
process
is what your application
runs inside (if you look in your OS process manager it should be node
).application
is one Node.js project that runs in a separate process
.worker
is an application
that is only focused with processing Queue jobs.Queue
and Job
is terminology of Bull.Processor
and Process
is terminology of Nest.js @nestjs/bull
Here is how you create an application with a worker running in separate processes. After following these instructions, you should see two processes running your process manager.
Create a new Nest.js application that we'll use for your worker
:
nest new my-worker
Open src/main.ts
and replace everything in bootstrap
function with:
const app = await NestFactory.createApplicationContext(AppModule);
Install Bull
and the Nest.js implementation with:
yarn add @nestjs/bull bull
Open src/app.module.ts
and remove AppController
from controllers
, and add BullModule.registerQueue
to imports (from @nestjs/bull
.
Your src/app.module.ts
should now look like:
// app.module.ts
// ... imports
@Module({
imports: [
BullModule.registerQueue({
name: 'my-queue',
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
Create a new file: app.processor.ts
in src
directory:
// app.processor.ts
// ... imports
@Processor('my-queue')
export class AppConsumer {
@Process('namedjob')
async processNamedJob(job: Job<any>): Promise<any> {
// do something with job and job.data
}
}
And you're done for the worker
side of things. Now all you need to do is in your application
(main project), update your AppModule
to include BullModule.registerQueue
(like above) and inject it:
export class MyService {
constructor(@InjectQueue('my-queue') private queue: Queue) {}
}
And then use this.queue.add('namedJob', data);
Try above and if you get stuck, create a repository on Github and I'll get you on the right track.
Upvotes: 32