Reputation: 1
I am new to BullMQ and I have been trying to implement "flow" feature but the referenced jobs that I have listed in the syncVotesTableAndExtractDataFlow
variable which is of type FlowProducer
are not ran (this code is inside FilterVoteDataProducer
class.
I am calling the method syncVotesTableAndExtractData
in my controller class which then calls syncVotesTableAndExtractData
(same name) in my FilterVoteDataProducer
class.
I have noticed that only data
property will be ran in both of the jobs but not in the correct sequence, first the parent's data is run and then the children, it should be the opposite according to the docs.
In my 2 consumer classes (FilterVoteDataConsumer
and SyncVoteTableConsumer
) I have created a switch case inside process
method which is extended from WorkerHost
class that waits particularly for the job name which belongs to that queue respectively but they are never ran.
My question is how do I setup the flow feature correctly so that the code in the mentioned switch cases can be run? Currently I don't see the point of listing the queueName property when adding a job to the flow feature if that job won't be called from the process method from the queue to which it belongs.
This is my output when I run the program:
Parent job executes LAST
<- first the parent is executed
Child job executes FIRST
<- then the child is executed
Here is my code:
FilterVoteDataProducer
class
import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';
import { Injectable } from '@nestjs/common';
import {
SYNC_FILTER_VOTES_DATA_QUEUE,
SYNC_VOTES_TABLE_QUEUE,
FILTER_VOTES_DATA_JOB,
SYNC_VOTES_TABLE_AND_EXTRACT_DATA_FLOW,
SYNC_VOTES_TABLE_JOB,
} from '../common/constants.js';
@Injectable()
export class FilterVoteDataProducer {
constructor(
@InjectFlowProducer(SYNC_VOTES_TABLE_AND_EXTRACT_DATA_FLOW)
private readonly syncVotesTableAndExtractDataFlow: FlowProducer,
) {}
async syncVotesTableAndExtractData() {
return await this.syncVotesTableAndExtractDataFlow.add({
name: SYNC_VOTES_TABLE_JOB,
queueName: SYNC_VOTES_TABLE_QUEUE,
data: console.log(`Parent job executes LAST`),
children: [
{
name: FILTER_VOTES_DATA_JOB,
queueName: SYNC_FILTER_VOTES_DATA_QUEUE,
data: console.log('Child job executes FIRST'),
},
],
});
}
}
SyncVoteTableConsumer
class
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import {
SYNC_VOTES_TABLE_QUEUE,
SYNC_VOTES_TABLE_JOB,
} from '../common/constants.js';
@Processor(SYNC_VOTES_TABLE_QUEUE, { lockDuration: 600000 })
export class SyncVoteTableConsumer extends WorkerHost {
constructor() {
super();
}
private readonly logger = new Logger(SyncVoteTableConsumer.name);
async process(job: Job<any>, token: string | undefined): Promise<void> {
switch (job.name) {
case SYNC_VOTES_TABLE_JOB:
{
this.logger.verbose(
`Attempting to sync votes table, job id: ${job.id}`,
);
const result = await this.syncVotesTable();
return result;
}
break;
default:
throw new Error(`Process ${job.name} not implemented`);
}
}
private async syncVotesTable() {
console.log(`Table is synced`);
}
}
FilterVoteDataConsumer
class
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import {
SYNC_FILTER_VOTES_DATA_QUEUE,
FILTER_VOTES_DATA_JOB,
} from '../common/constants.js';
@Processor(SYNC_FILTER_VOTES_DATA_QUEUE, { lockDuration: 600000 })
export class FilterVoteDataConsumer extends WorkerHost {
constructor() {
super();
}
private readonly logger = new Logger(FilterVoteDataConsumer.name);
async process(job: Job<any>, token: string | undefined): Promise<void> {
switch (job.name) {
case FILTER_VOTES_DATA_JOB:
{
this.logger.verbose(`Getting specific votes data ${job.id}`);
this.getSpecVoteData();
}
break;
default:
throw new Error(`Process ${job.name} not implemented`);
}
}
private async getSpecVoteData() {
return this.logger.debug(`Votes data obtained`);
}
}
VoteController
class
import { Controller, Get, Post } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { FilterVoteDataProducer } from '../filterVoteData.producer.js';
@Controller('vote-controller')
export class VoteController {
constructor(private readonly producer: FilterVoteDataProducer) {}
@Cron(CronExpression.EVERY_5_SECONDS)
@Post('vote')
async syncVotesTable() {
await this.producer.syncVotesTableAndExtractData();
}
}
Upvotes: 0
Views: 702
Reputation: 36
You see the output when your flow is initialized, not when your jobs execute. This happens when you assign console.log()
to first parent and then child job data field.
Looks like your jobs are not added or not running yet.
Make sure you register your queue, flow producer and your processor as a provider, see details in the official docs
You may check if your jobs are added to the queue by looking them up in redis-cli
by running keys bull:*
query if default key prefix is used.
Upvotes: 0