Reputation: 8268
I am trying to read from a stream and process the data synchronously.
The problem is that the processing logic is an asynchronous method (which returns a promise). Here's an example:
stream.on("data", async (data) => {
await db.collection("mydb").insertMany(data)
}).on("end", () => {
console.log("finished")
})
If I run this, I think it will iterate through the stream and make many concurrent insertMany
requests to the DB because it doesn't wait for the insertMany
to finish, which will overload the database. I want to make sure this function is processed only one at a time.
How to do this?
Upvotes: 1
Views: 954
Reputation: 708046
First off, unless your stream is in object mode, there's no guarantee that what arrives in a given data
event is exactly what you want to insert into your database. So, you may have to parse the stream to collect the right set of data for each insert.
Then, if you want to only be doing one insert at a time, there are a couple of strategies:
You can pause the stream before you call the insert and then when the insert finishes, you can resume the stream.
You can allow the stream to continue to read and fire data
events and queue up the data into some sort of queue where you then insert them one at a time from the queue. You can either use a physical queue or chain promises
Here's how the pause option looks:
stream.on("data", async (data) => {
// sort out data into whole chunks, a stream (unless it's in "object mode")
// can give you data events for any arbitrary chunk of data, not just the
// chunks you may want to insert into your database
stream.pause();
try {
await db.collection("mydb").insertMany(data);
stream.resume();
} catch(e) {
// decide what you're doing here if there's an error inserting
// stream will be paused unless you resume it here
}
}).on("end", () => {
console.log("finished")
})
Upvotes: 2