Arey Jeremy
Arey Jeremy

Reputation: 77

Bull queue job with fetch blocking the node event loop?

we have started to use Bull queue to process our background tasks (Heroku platform, Node.js, Redis). Time to time it happens that some job gets stuck in the active state and no other tasks are being processed - they are being put into the wait state instead. So it seems as the task is still being processed and taking all the resources. But the request timeout is not being hit in this case and it is just stuck. Also we do not really believe it is a performance issue as we assume the job would get stalled or failed, which is not the case.

The purpose of our jobs is nothing but light data processing and sending it to an external system. Below you can see basically what it does (shortly). We have no idea what could be causing this as the app can work for few days without being stuck. When it gets stuck, it is always right before sending the data or actually the "Dynamic-Data: Starts sending data" log is the last log being printed in this case, no errors, not other logs. So we are only guessing it might be caused by the fetch? Or any hanged promise? process? Do you guys see anything that could be potentially causing such behavior? I would really appreciate any help. Thank you in advance.

workQueue.process(maxJobsPerWorker = 10, async (job, done) => {
        try { 
            await data_process(job.data);
            done();
        }
        catch (error) {
            done(error);
        }
 });


async function data_process(data) {
    try { 
        let responses = [];
        let completed_requests = 0;
        let fetchPromises = data.systemsToSendDataTo.map(async (systemData) => { 
            let options = {
                method: 'POST',
                url: systemData.URL,
                headers: {
                    'Content-Type': 'application/json',
                },
                agent: proxyAgent,
                body: JSON.stringify({
                    "params": {
                        "data": systemData.data,
                    }
                })
            };

            winston.debug("Dynamic-Data: Starts sending data");
            try {
                const JSONresponse = await sendRequest(options);
                responses.push(JSONresponse);
                completed_requests++;
                handleFinalResponse();
            } catch (err) {
                responses.push(err);
                completed_requests++;
                handleFinalResponse();
            }

            function handleFinalResponse() {
                if (completed_requests === systemsToSendDataTo.length) {
                    winston.info("Info: ", {responses: responses});
                }
            }
        });

        await Promise.all(fetchPromises);
        winston.debug("Dynamic-Data: All fetch promises resolved.");
    }
    catch(error) {
        winston.info("Dynamic-Data: Error:", { err: error});
    } 
}

async function sendRequest(options) {
    const controller = new AbortController();
    const signal = controller.signal;

    // Set the signal in the options
    options.signal = signal;

    // Set a timeout to abort the request
    const timeoutId = setTimeout(() => {
        controller.abort();
    }, 22000); // Timeout in milliseconds

    try {
        const response = await fetch(options.url, options);
        clearTimeout(timeoutId);
        return await response.json();
    } catch (err) {
        console.log(err)
        throw err;
    } 
}

// Initialize the clustered worker process (where workers = 1)
throng({ workers, start });

Upvotes: 1

Views: 107

Answers (0)

Related Questions