Denis Stephanov
Denis Stephanov

Reputation: 5281

Bull's processor in NestJS never handled queued jobs

I am trying to send DTOs to my Redis queue with a Bull framework and handle these DTOs in processors. Sometimes job pass to processor (1 of 100) but most of the time failed with error: job stalled more than allowable limit and I have no idea how to fix it.

I give you a small intro and below you can see my code. I have created queue-api module which serve as wrapper for my queues, for instance order queue. This module then I import to modules from which I want publish DTO into queues, in my case order-module.

queue-api module files

// queue-api.module.ts
@Module({
  imports: [
    BullModule.registerQueue(
      {
        name: 'order-queue',
        defaultJobOptions: {
          backoff: 10000,
          attempts: Number.MAX_SAFE_INTEGER,
        },
      },
    ),
    ...
  ],
  providers: [OrderQueue],
  exports: [OrderQueue],
})
export class QueueApiModule {}

// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
  constructor(
    @InjectQueue('order-queue')
    private readonly queue: Queue,
  ) {}

  async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
    const job = await this.queue.add('send-submit-mail', dto)
    console.log(`Job ${job.id} created.`)
  }
}

order-module files

// order.module.ts
@Module({
  imports: [
    QueueApiModule,
    ...
  ],
  providers: [
    OrderProcessor,
    ...
  ]
})
export class OrderModule {}

// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
  constructor(private readonly queue: OrderQueue) {}

  @Process('send-submit-mail')
  async onProcessSubmitMail(job: Job): Promise<void> {
    console.log(`Processing of job ${job.id}`)
  }
}

this processor handler is almost never called.

Do you have any idea what is wrong with my code? Thank you in advice.

Upvotes: 1

Views: 5941

Answers (2)

sarkstephan
sarkstephan

Reputation: 11

A little late, it's still better to have it written here

It happens because of this line constructor(private readonly queue: OrderQueue) {}

And more precisely it's because the DI mechanism, Probably the reason the service is Scope.REQUEST (Or one of its injected services, which makes the host service also a Scope.REQUEST service, the whole injection sub-tree is request scoped)

@Process() runs the handler in separate process, and therefore has no access to the Injector.

If you look at the error that results from trying to process the job.data, yo'll see something like this (In my case trying to inject EmailService): stacktrace ["TypeError: this.request.get is not a function\n at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)\n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)\n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)\n at processTicksAndRejections (node:internal/process/task_queues:95:5)\n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)\n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)\n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)\n at async Promise.all (index 0)\n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)\n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)","TypeError: this.request.get is not a function\n at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)\n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)\n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)\n at processTicksAndRejections (node:internal/process/task_queues:95:5)\n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)\n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)\n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)\n at async Promise.all (index 0)\n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)\n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)"]

Upvotes: 0

dmitrydigi
dmitrydigi

Reputation: 53

I'm getting similar problem, but haven't drilled down to find the root cause yet. But for meantime I used bull-repl (npm bull-repl ) cli to see the queue status. When stalled error occurs, none job will be triggered after that (seems like queue is stuck on the failed job). If you run stats in bull-repl you'll see that there's a job in Active state. you can manually remove it (using bull-repl) and then you'll get next job running. I suspect that QueueScheduler isn't running therefore stalled jobs are not taken cared of. You can also increase the stalled timeouts params (there're 2-3 of them, check [https://docs.bullmq.io/bull/important-notes]) to see if it helps. In my case, the lock happens when I pause in debug.

Upvotes: 0

Related Questions