Enkidu
Enkidu

Reputation: 1

NestJs BullMQ: "Flow" feature does not execute the parent and children jobs

I am new to BullMQ and I have been trying to implement "flow" feature but the referenced jobs that I have listed in the syncVotesTableAndExtractDataFlow variable which is of type FlowProducer are not ran (this code is inside FilterVoteDataProducer class.

I am calling the method syncVotesTableAndExtractData in my controller class which then calls syncVotesTableAndExtractData (same name) in my FilterVoteDataProducer class.

I have noticed that only data property will be ran in both of the jobs but not in the correct sequence, first the parent's data is run and then the children, it should be the opposite according to the docs.

In my 2 consumer classes (FilterVoteDataConsumer and SyncVoteTableConsumer) I have created a switch case inside process method which is extended from WorkerHost class that waits particularly for the job name which belongs to that queue respectively but they are never ran.

My question is how do I setup the flow feature correctly so that the code in the mentioned switch cases can be run? Currently I don't see the point of listing the queueName property when adding a job to the flow feature if that job won't be called from the process method from the queue to which it belongs.

This is my output when I run the program:

Parent job executes LAST <- first the parent is executed

Child job executes FIRST <- then the child is executed

Here is my code:

FilterVoteDataProducer class


import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';
import { Injectable } from '@nestjs/common';
import {
  SYNC_FILTER_VOTES_DATA_QUEUE,
  SYNC_VOTES_TABLE_QUEUE,
  FILTER_VOTES_DATA_JOB,
  SYNC_VOTES_TABLE_AND_EXTRACT_DATA_FLOW,
  SYNC_VOTES_TABLE_JOB,
} from '../common/constants.js';

@Injectable()
export class FilterVoteDataProducer {
  constructor(
    @InjectFlowProducer(SYNC_VOTES_TABLE_AND_EXTRACT_DATA_FLOW)
    private readonly syncVotesTableAndExtractDataFlow: FlowProducer,
  ) {}

  async syncVotesTableAndExtractData() {
    return await this.syncVotesTableAndExtractDataFlow.add({
      name: SYNC_VOTES_TABLE_JOB,
      queueName: SYNC_VOTES_TABLE_QUEUE,
      data: console.log(`Parent job executes LAST`),
      children: [
        {
          name: FILTER_VOTES_DATA_JOB,
          queueName: SYNC_FILTER_VOTES_DATA_QUEUE,
          data: console.log('Child job executes FIRST'),
        },
      ],
    });
  }
}

SyncVoteTableConsumer class


import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import {
  SYNC_VOTES_TABLE_QUEUE,
  SYNC_VOTES_TABLE_JOB,
} from '../common/constants.js';

@Processor(SYNC_VOTES_TABLE_QUEUE, { lockDuration: 600000 })
export class SyncVoteTableConsumer extends WorkerHost {
  constructor() {
    super();
  }
  private readonly logger = new Logger(SyncVoteTableConsumer.name);

  async process(job: Job<any>, token: string | undefined): Promise<void> {
    switch (job.name) {
      case SYNC_VOTES_TABLE_JOB:
        {
          this.logger.verbose(
            `Attempting to sync votes table, job id: ${job.id}`,
          );
          const result = await this.syncVotesTable();
          return result;
        }
        break;

      default:
        throw new Error(`Process ${job.name} not implemented`);
    }
  }

  private async syncVotesTable() {
    console.log(`Table is synced`);
  }
}

FilterVoteDataConsumer class


import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import {
  SYNC_FILTER_VOTES_DATA_QUEUE,
  FILTER_VOTES_DATA_JOB,
} from '../common/constants.js';

@Processor(SYNC_FILTER_VOTES_DATA_QUEUE, { lockDuration: 600000 })
export class FilterVoteDataConsumer extends WorkerHost {
  constructor() {
    super();
  }
  private readonly logger = new Logger(FilterVoteDataConsumer.name);

  async process(job: Job<any>, token: string | undefined): Promise<void> {
    switch (job.name) {
      case FILTER_VOTES_DATA_JOB:
        {
          this.logger.verbose(`Getting specific votes data ${job.id}`);
          this.getSpecVoteData();
        }
        break;

      default:
        throw new Error(`Process ${job.name} not implemented`);
    }
  }

  private async getSpecVoteData() {
    return this.logger.debug(`Votes data obtained`);
  }
}

VoteController class


import { Controller, Get, Post } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { FilterVoteDataProducer } from '../filterVoteData.producer.js';

@Controller('vote-controller')
export class VoteController {
  constructor(private readonly producer: FilterVoteDataProducer) {}

  @Cron(CronExpression.EVERY_5_SECONDS)
  @Post('vote')
  async syncVotesTable() {
    await this.producer.syncVotesTableAndExtractData();
  }
}

Upvotes: 0

Views: 702

Answers (1)

eugenecodes
eugenecodes

Reputation: 36

You see the output when your flow is initialized, not when your jobs execute. This happens when you assign console.log() to first parent and then child job data field.

Looks like your jobs are not added or not running yet.

Make sure you register your queue, flow producer and your processor as a provider, see details in the official docs

You may check if your jobs are added to the queue by looking them up in redis-cli by running keys bull:* query if default key prefix is used.

Upvotes: 0

Related Questions