Naga
Naga

Reputation: 21

How to send notification or mail from Nestjs using Server Side Events to bulk inserted records?

In my code, the create method in AllocateVoucherController controller invoke the create method in AllocateVoucherService.That method create the record and emit the after-allocate-voucher event, that event invokes the createVoucherEntryForEmployee method in the same service, on the createVoucherEntryForEmployee method i added the allot-voucher queue, that queue is processed by AllotVoucherConsumer processor class.

In that processor class i select the voucher alloted employees from employee table using prisma, and then create the records (200) on employeeVoucher table.

What my need is after the records created in employeeVoucher table, i want to emit a new event(after the queue ended).That event need to invoke the updatedEmployeeEvent method in AllocateVoucherController, via that methos the employees get notification.

For example, i allocate voucher to employees by department not only by department also by grade or by experience etc.

So, 100 employees in HR department, 200 employees in Sales department, but, i allocate voucher to Sales department, so only that 200 employees get notification, not the hundred in HR department but all 300 employees are subscribed the same EventSource(API).

Is that possible using Server Side Event? I dont want to use socket.

My Controller :

export class AllocateVoucherController {
 constructor(
    private readonly allocateVoucherService: AllocateVoucherService,
    private eventEmitter: EventEmitter2
 ) { }

 @Post()
 @ApiOkResponse({ type: AllocateVoucherEntity })
 create(@Body() createAllocateVoucherDto: CreateAllocateVoucherDto) {
    return this.allocateVoucherService.create(createAllocateVoucherDto);
 }

 @OnEvent('after-pushed-voucher-to-employee')
 @Sse('vouchered-employee')
 updatedEmployeeEvent(): Observable<MessageEvent> {
    try {
        return fromEvent(this.eventEmitter, 'after-added-voucher').pipe(
            map((data) => {
                return new MessageEvent('after-added-voucher', { data: data } as MessageEvent)
            }),
        );
    } catch (e) {
        return e;
    }
 }
}

My Service:

export class AllocateVoucherService {
    constructor(
        private prisma: PrismaService,
        private eventEmitter: EventEmitter2,
        @InjectQueue('allotVoucher') private allotVoucherQueue: Queue
    ) { }

    async create(createAllocateVoucherDto: CreateAllocateVoucherDto) {
        const allocatedVoucher = await this.prisma.allocateVoucher.create({ data: createAllocateVoucherDto });

        if (allocatedVoucher && allocatedVoucher.id) {
            this.eventEmitter.emit('after-allocate-voucher', allocatedVoucher)
        }

        return allocatedVoucher
    }

    @OnEvent('after-allocate-voucher')
    async createVoucherEntryForEmployee(payload: AllocateVoucherEntity) {

        const dictionary = await this.prisma.dictionary.findUnique({ where: { id: payload.assignId } });
    
        try {
            await this.allotVoucherQueue.add(
                'allot-voucher',
                {
                    dictionary: dictionary,
                    allocatedVoucher: payload,
                },
                {
                    delay: 3000,
                    removeOnComplete: true, // removes the job from Redis on successful completion
                    removeOnFail: true,     // removes the job from Redis on failure
                },
            );
    
            console.log('job added to queue');
        } catch (error) {
            return { status: 'error-xxx', message: error.message };
        }

    }
}

My Processor:

export class AllotVoucherConsumer extends WorkerHost{

    async process(job: Job<any, any, string>): Promise<any> {
        
        switch (job.name) {
            case 'allot-voucher': {

                const employees = await prisma.employee.findMany({
                    where: {
                        OR: [
                            { gradeId: job.data.dictionary.id },
                            { brandId: job.data.dictionary.id },
                            { departmentId: job.data.dictionary.id },
                            { companyId: job.data.dictionary.id },
                        ]
                    }
                });

                try {
                    await Promise.all(
                        employees.map((record: any) =>
                            prisma.employeeVoucher.createMany({
                                data: {
                                    amount: job.data.allocatedVoucher.amount,
                                    unit: job.data.allocatedVoucher.unit,
                                    noUnit: job.data.allocatedVoucher.noUnit,
                                    assignEmployeeId: record.id,
                                    voucherId: job.data.allocatedVoucher.voucherId
                                }
                            })
                        )
                    );
                } catch (err) { 
                    console.log(err);
                }
            }
        }
    }

    @OnWorkerEvent('active')
    onActive(job: Job) {
        console.log(
        `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
        );
    }

    @OnWorkerEvent('completed')
    onComplete(job: Job) {
        console.log(
        `Completed job ${job.id} of type ${job.name} with data ${job.data}...`,
        );
    }
}

In forntEnd React component

const TestComponentTwo = () => {

const { colorMode } = useColorMode();
const [employees, setEmployees] = useState([]); // Initialize as an array
const [isLoading, setIsLoading] = useState(false);

useEffect(() => {
    console.log("Setting up fetch and EventSource...");
    setIsLoading(true);

    fetch('http://127.0.0.1:3000/allocate-voucher/voucher-by-employee?assignEmployeeId=1')
        .then((response) => response.json())
        .then((data) => {
            console.log("Fetched initial data:", data);
            setEmployees(data);
            setIsLoading(false);
        })
        .catch((error) => {
            console.error("Error fetching initial data:", error);
            setIsLoading(false);
        });

    const eventSource = new EventSource('http://localhost:3000/allocate-voucher/vouchered-employee');

    eventSource.onopen = () => {
        console.log("EventSource connection opened.");
    };

    eventSource.addEventListener('after-added-voucher', (event) => {
        console.log("Received custom event:", event);
        const newEmployee = JSON.parse(event.data);
        setEmployees((prevEmployees) => [newEmployee, ...prevEmployees]);
    });

    eventSource.onerror = (error) => {
        console.error("EventSource encountered an error or disconnected:", error);
        eventSource.close();
    };

    return () => {
        console.log("Closing EventSource...");
        eventSource.close();
    };
}, []);

return (
    <>
        {employees?.length > 0 &&
            employees?.map((data) => (
                <Tr key={data.id}>
                    <Td> {data.assignEmp.firstName}</Td>
                    <Td> {data.voucher.name}</Td>
                    <Td> {data.amount}</Td>
                    <Td> {data.unit}</Td>
                    <Td> {data.isActive}</Td>
                </Tr>
            ))
        }
    
    </>
)
};

export default TestComponentTwo;

Upvotes: 0

Views: 41

Answers (0)

Related Questions