yuridamata
yuridamata

Reputation: 519

BullMQ and NestJS - Manage Queue Names

I'm building a application using BullMQ and NestJS. It's all working fine, but there is something bugging me.

When register a new queue on my application I do something like that:


    @Module({
       imports: [             
             BullModule.registerQueueAsync({
              name: 'email-queue',
           }),
      ],
      controllers: [EmailController],
      providers: [ ],
    })


Then, when I'm using it on a NestJS service, I'd do something like that:


@Injectable()
export class WhatsappQueueService {
  constructor(
    @InjectQueue('email-queue') private queue: Queue,
  ) {}

  async addMessage(dtoSendMessage: any) {
    const included = await this.queue.add(
      'send-message',
      dtoSendMessage,
    );
    return included;
  }

}

The problem is that I'd like to create a constant with the value 'email-queue', export it from the service and use it on the Module definition. So, I don't have to manually manage the queue names.

When I use the constant, I get a error saying that NestJS can't find the Queue. I believe that's something to do with the @InjectQueue() decorator.

How can I use constants to name my queues?

Upvotes: 0

Views: 582

Answers (1)

DMabulage
DMabulage

Reputation: 874

yes you can use constants

src/queue/constants/queue.constants.ts

export const QUEUE_NAME = 'add-email-vectors-queue';
export const QUEUE_PROCESS = 'save';

export const EMAIL_CACHE_QUEUE_PROCESS = 'email-cache';
export const EMAIL_CACHE_QUEUE_NAME = 'email-cache-queue';
 BullModule.registerQueue({
      name: QUEUE_NAME,
      defaultJobOptions: {
        delay: 1000,
        removeOnComplete: true,
        removeOnFail: true,
        attempts: 3,
      },
    }),
import {
  EMAIL_CACHE_QUEUE_NAME,
  EMAIL_CACHE_QUEUE_PROCESS,
} from './constants/queue.constants';

@Processor(EMAIL_CACHE_QUEUE_NAME)
export class EmailCacheQueueProcessService {
  private readonly logger = new Logger(EmailCacheQueueProcessService.name);

  constructor(
    private readonly outlookService: OutlookService,
    private readonly cacheStoreService: CacheStoreService,
  ) {}

  @Process(EMAIL_CACHE_QUEUE_PROCESS)
  async saveEmailsCacheProcess(job: Job) {
    const emails = await this.outlookService.getEmailsForCaching(
      job.data.token,
      1,
      100,
    );

   
    this.logger.log(`Job completed for ${emails.length} emails in ${EMAIL_CACHE_QUEUE_NAME}`);
  }
}
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { QUEUE_NAME, QUEUE_PROCESS } from './constants/queue.constants';

@Injectable()
export class QueueService {
  constructor(
    @InjectQueue(QUEUE_NAME)
    private queue: Queue,
  ) {}

  /**
   * @param text email body and the id of the email
   * @param userId user id of the database
   * @description Save emails to vector database from queue
  */
  async saveVectorsFromQueue(
    text: { text: string; id: string }[],
    userId: string,
  ) {
    await this.queue.add(QUEUE_PROCESS, {
      text,
      userId,
    });

    console.log('Job added to queue');
  }
}

You can use constants for queue names.

Upvotes: 0

Related Questions