Reputation: 1
This is my connection pool
require("dotenv").config();
const mysql = require("mysql2/promise");
const db = mysql.createPool({
host: process.env.Dbhost,
user: process.env.Dbusername,
password: process.env.Dbpassword,
database: process.env.Dbdatabase,
port: process.env.Dbport,
waitForConnections: true,
connectionLimit: 230,
queueLimit: 5,
multipleStatements: true,
});
module.exports = db;
This is a background job that is being executed by multiple workers as a background job
const { parentPort, workerData } = require("worker_threads");
const db = require("../db");
require("dotenv").config();
const axios = require("axios");
const { GoogleGenerativeAI } = require("@google/generative-ai");
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
const processUnprocessedCalls = async () => {
const connection = await db.getConnection();
try {
await connection.beginTransaction();
// Select the unprocessed calls with a FOR UPDATE lock to prevent double processing
const [unprocessedCalls] = await connection.query(
`SELECT * FROM unProcess_call
WHERE ProcessedStatus = 0
ORDER BY priority ASC
LIMIT ${workerData.batchSize}
FOR UPDATE`
);
for (const call of unprocessedCalls) {
let transcript = "";
let response_text = "";
let statusDeepgram;
// If call recording is not found, continue processing but leave transcript and response as empty
if (
call.callRecording !== "" &&
call.callRecording !== null &&
call.callRecording !== undefined
) {
const result = await getTranscriptAndResponse(call.callRecording);
transcript = result.transcript;
response_text = result.response_text;
statusDeepgram = result.deepgramStatus;
}
let qualifiedstatus = response_text?.trim();
console.log("Q/A status", qualifiedstatus);
if (statusDeepgram === true) {
await connection.query(
`INSERT INTO Processed_call (callStartDate, QuerystringHandleTime, QuerystringAgent, QuerystringQueue, QuerystringPhoneNumber, QuerystringRecording, QuerystringFirstName, QuerystringLastName, QuerystringEmail, QuerystringCaseID, QuerystringDebtamout, QuerystringSource, QuerystringStatus, QuerystringAgentname, QuerystringAgentemail, QuerystringAgentgroup, ProcessedStatus, Created_At, Processed_At, QualifiedStatus , callUrl, callSumarry)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
[
call.callStartDate,
call.QuerystringHandleTime,
call.QuerystringAgent,
call.QuerystringQueue,
call.QuerystringPhoneNumber,
call.QuerystringRecording || null,
call.QuerystringFirstName,
call.QuerystringLastName,
call.QuerystringEmail,
call.QuerystringCaseID,
call.QuerystringDebtamout,
call.QuerystringSource,
call.QuerystringStatus,
call.QuerystringAgentname,
call.QuerystringAgentemail,
call.QuerystringAgentgroup,
1,
call.Created_At,
new Date(),
qualifiedstatus === "Qualified" ? 1 : 0,
call.callRecording || null,
transcript || "",
]
);
// Mark the call as processed
await connection.query(
"UPDATE unProcess_call SET ProcessedStatus = 1 WHERE id = ?",
[call.Id]
);
console.log("sucess fully updated the data");
} else {
await connection.query(
"UPDATE unProcess_call SET ProcessedStatus = 0, priority = priority + 1 WHERE id = ?",
[call.Id]
);
}
}
await connection.commit();
console.log(`Worker ${workerData.workerId}: Processing complete`);
} catch (error) {
await connection.rollback();
console.error(
`Worker ${workerData.workerId}: Error processing calls:`,
error
);
} finally {
connection.release();
}
parentPort.postMessage("done");
};
// Helper function to get transcript and AI-generated response
async function getTranscriptAndResponse(audio_url) {
let attemptUrl = audio_url;
while (true) {
try {
const deepgramResponse = await axios.post(
"https://api.deepgram.com/v1/listen",
{ url: attemptUrl },
{
headers: {
Authorization: `Token ${process.env.DEEPGRAM_API_KEY}`,
"Content-Type": "application/json",
},
params: {
model: "nova-2",
language: "en",
summarize: "v2",
intents: true,
smart_format: true,
sentiment: true,
},
}
);
const alternatives =
deepgramResponse.data.results.channels[0].alternatives;
const allTranscripts = alternatives.map((alt) => alt.transcript);
const resultgot = allTranscripts[0];
const { result, status } = await checkQualification(resultgot);
return {
transcript: resultgot,
response_text: result,
deepgramStatus: true,
};
} catch (error) {
if (error.response && error.response.status === 400) {
try {
console.log(error.response.data);
attemptUrl = getPreviousDateUrl(attemptUrl);
} catch (urlError) {
console.log("error in date downgrading the date", urlError);
break;
}
} else {
console.log("error without response", error);
break;
}
}
}
return {
transcript: `Deepgram Failed to get transcript of the call`,
response_text: "",
deepgramStatus: false,
};
}
// Function to check qualification using Google Generative AI
async function checkQualification(transcript) {
try {
const chatSession = genAI
.getGenerativeModel({
model: "gemini-1.5-flash",
})
.startChat({
generationConfig: {
temperature: 1,
topP: 0.95,
topK: 64,
maxOutputTokens: 8192,
responseMimeType: "text/plain",
},
history: [
{
role: "user",
parts: [
{
text: "Role: QA Tester for Tax Resolution Company\n\nObjective: Determine if the client's call recording qualifies based on tax debt amount.\n\nQualification Criteria:\n\nQualified: $10,000 or more in tax debt\nUnqualified: Less than $10,000 in tax debt\nInstructions:\n\nReview the transcript of the client's call recording.\nDetermine if the call is Qualified or Unqualified based on the tax debt mentioned.\nProvide an exact qualification status without any summary or additional commentary.",
},
],
},
{
role: "model",
parts: [
{
text: "Please provide me with the transcript of the client's call recording. I will then determine if the call is Qualified or Unqualified based on the tax debt mentioned.",
},
],
},
],
});
// Send the transcript to the chat session
const result = await chatSession.sendMessage(transcript);
return { result: result.response.text(), status: true };
} catch (error) {
console.error("Error in qualification check:", error);
return {
result: "occurred while processing qualification check",
status: false,
};
}
}
// Function to get the url of back date
function getPreviousDateUrl(url) {
if (typeof url !== "string" || !url.includes("/")) {
throw new Error("Invalid URL format.");
}
const segments = url.split("/");
const lastSegment = segments[segments.length - 1];
const exactDate = lastSegment.split("-")[0];
const year = exactDate.slice(0, 4);
const month = exactDate.slice(4, 6);
const day = exactDate.slice(6, 8);
const originalDate = new Date(`${year}-${month}-${day}`);
const datePart = segments[segments.length - 2];
if (!/^\d{4}-\d{2}-\d{2}$/.test(datePart)) {
throw new Error("Invalid date format. Expected format: YYYY-MM-DD.");
}
const formattedDate = new Date(datePart);
if (isNaN(formattedDate.getTime())) {
throw new Error("Invalid date value.");
}
formattedDate.setDate(formattedDate.getDate() - 1);
const diffInDays = (originalDate - formattedDate) / (1000 * 60 * 60 * 24);
if (diffInDays > 1) {
throw new Error(
"The formatted date is more than one day before the original date."
);
}
return `${
process.env.CallRecordingBaseUrl
}/${formattedDate.getFullYear()}-${String(
formattedDate.getMonth() + 1
).padStart(2, "0")}-${String(formattedDate.getDate()).padStart(
2,
"0"
)}/${lastSegment}`;
}
// Start processing as soon as worker is created
processUnprocessedCalls();
This is job starter in background with multiple worker
const { Worker } = require("worker_threads");
const { CronJob } = require("cron");
const path = require("path");
const { promisify } = require("util");
const fs = require("fs");
// Constants for retries and delays
const MAX_RETRIES = 3;
const RETRY_DELAY_MS = 2000; // 2 seconds between retries
// Flag to track if a job is already running
let isJobRunning = false;
// Path to the worker file
const workerPath = path.resolve(__dirname, "../worker/rawCall-worker.js");
// Utility function to wait (for retry delay)
const delay = promisify(setTimeout);
// Function to create a worker
const startWorker = (workerId, batchSize, attempt = 0) => {
return new Promise((resolve, reject) => {
console.log(`Starting worker ${workerId} (attempt ${attempt + 1})...`);
const worker = new Worker(workerPath, {
workerData: {
workerId,
batchSize,
},
});
worker.on("message", (message) => {
console.log(`Worker ${workerId} completed successfully.`);
resolve();
});
worker.on("error", async (error) => {
console.error(`Worker ${workerId} encountered an error:`, error);
// Retry if failed
if (attempt < MAX_RETRIES) {
console.log(
`Retrying worker ${workerId} in ${RETRY_DELAY_MS / 1000} seconds...`
);
await delay(RETRY_DELAY_MS);
return startWorker(workerId, batchSize, attempt + 1)
.then(resolve)
.catch(reject);
} else {
console.error(
`Worker ${workerId} failed after ${MAX_RETRIES} attempts.`
);
reject(
new Error(`Worker ${workerId} failed after ${MAX_RETRIES} retries`)
);
}
});
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker ${workerId} exited with code ${code}`));
}
});
});
};
const startWorkers = async () => {
const batchSize = 10; // Number of calls per worker
const numWorkers = 3; // Number of workers to start
const workers = [];
for (let i = 1; i <= numWorkers; i++) {
workers.push(startWorker(i, batchSize));
}
await Promise.all(workers);
};
// Cron job to start the workers every 3 minutes
const rawCallProcessWithWorker = new CronJob(
"*/5 * * * *",
async function startJob() {
if (isJobRunning) {
console.log("Previous job is still running, skipping this cycle.");
return;
}
console.log("Starting workers to process calls...");
isJobRunning = true;
const startTime = new Date();
try {
await startWorkers();
console.log(
`All workers completed successfully. Time taken: ${
(new Date() - startTime) / 1000
} seconds.`
);
} catch (err) {
console.error("Error during workers execution:", err);
} finally {
isJobRunning = false;
console.log("rawCallProcess completed.");
}
}
);
// Graceful shutdown handler
process.on("SIGINT", () => {
console.log("Received SIGINT. Gracefully shutting down the cron job.");
rawCallProcessWithWorker.stop();
process.exit(0);
});
// Graceful shutdown for unexpected errors
process.on("uncaughtException", (err) => {
console.error("Uncaught exception:", err);
rawCallProcessWithWorker.stop();
process.exit(1);
});
module.exports = rawCallProcessWithWorker;
What is happening is that this is creating 3 database connections every 5 minutes and after the job is finished it works. The connection it created does not release. the cause of reaching the maximum connection limit to the database.
Upvotes: 0
Views: 16