Reputation: 91
below is my file upload function with the respective files with the router initialization, I am trying to stream write a file in this code, I am still relatively new to the concepts of streaming in nodejs and uWebsocket at this low level considering I've used express for most of my time with nodejs development. To manage backpressure I've added some conditions which would pause the res(uWs.HttpResponse) from further sending the data & once the data is drained from the writeStream I resume the response, what I don't understand is the consoles that I am getting immediately the first console console.log("Backpressure detected, pausing response.") which is strange for me because at the start it the writeStream should be empty? furthermore the processing pauses infinitely, after some tries at Backpressure detected, pausing response and no other console is logged it does not resume the response or drain the file for some reason? even the file does not finish writing & that console is also not logged it just pauses, is there something I may be doing wrong? or something related to uWs.HttpResponse?
I have tried uploading the file using postman & curl, it's the same in both, on the other hand if I use the nodejs:http module for this streaming this whole implementation with pausing/resuming works without an issue.
Would really appreciate any help, thanks :)
funcs/index.ts // File Upload Function
function streamFileUpload(res: HttpResponse, req: HttpRequest) {
const filename = `upload_${Date.now()}.bin`;
const filepath = path.join(__dirname, "../uploads", filename);
const writeStream = fs.createWriteStream("testfile.txt");
let isPaused = false;
writeStream.on("drain", () => {
if (isPaused) {
console.log("Draining complete, resuming response.");
isPaused = false;
res.resume();
}
});
writeStream.on("finish", () => {
console.log("File upload complete.");
res.writeStatus("200 OK").end(JSON.stringify({ success: true, filepath }));
});
writeStream.on("error", (err) => {
console.error("Error writing to file:", err.message);
res
.writeStatus("500 Internal Server Error")
.end(JSON.stringify({ error: "File upload failed" }));
});
res.onData((ab: any, isLast: any) => {
const chunk = Buffer.from(ab);
const backpressure = writeStream.write(chunk);
if (!backpressure) {
console.log("Backpressure detected, pausing response.");
isPaused = true;
res.pause();
}
if (isLast) {
console.log("Received last chunk, ending file stream.");
writeStream.end();
}
});
res.onAborted(() => {
console.error("Request aborted by the client.");
writeStream.destroy();
});
}
router.ts // Router that calls the function
type UWSApp = ReturnType<typeof uWs.App>;
export default (app: UWSApp) => {
app.post("/api/upload", (res, req) => {
setCORSHeaders(res);
streamFileUpload(res, req);
});
};
app.ts
const app = uWs.App();
// registering the routes
registerUserRoutes(app);
indexRoutes(app);
app
.ws("/*", {
compression: uWs.SHARED_COMPRESSOR,
maxPayloadLength: 16 * 1024 * 1024,
idleTimeout: 10,
open: (ws) => {
console.log("A WebSocket connected!");
ws.send(JSON.stringify({ message: "thanks for connecting" }));
},
message: (ws, message, isBinary) => {
let data = Buffer.from(message).toString();
console.log("Received data:", data);
redisPublisher.publish("chatroom", data);
messageFunction(ws as any, message);
},
drain: (ws) => {
console.log("WebSocket backpressure: " + ws.getBufferedAmount());
},
close: (ws, code, message) => {
console.log("WebSocket closed");
},
})
.get("/test", (res: HttpResponse, req: HttpRequest) => {
console.log("testing this route");
// res.writeHeader();
// res.
res.onAborted(() => {
res.close();
});
});
(async () => {
try {
const result = await db.execute(`SELECT NOW()`);
console.log("Postgres & Server is up & running");
app.listen(3200, (listenSocket) => {
if (listenSocket) console.log("listening on port 3200");
});
} catch (error) {
console.error("Failed to connect to the database:", error);
}
await redisClient.connect();
await redisPublisher.connect();
await redisClient.subscribe("chatroom", (message: string) => {
const parsed: MessagePayload = JSON.parse(message);
console.log("publishing message to chat room=>", parsed.chatRoomId);
console.log("parsed message", parsed.message);
app.publish(parsed.chatRoomId, JSON.stringify(parsed.message));
});
})();
Upvotes: 0
Views: 34