Projjol
Projjol

Reputation: 1375

createWriteStream in node seems to execute 'end' event before 'data'

I am unable to understand how the event loop is processing my snippet. What I am trying to achieve is

const readAndUpload =  () => {
    fs.createReadStream('filename.csv')
    .pipe(csv())
    .on('data', ((row: any) => {
        const file = fs.createWriteStream("file.jpg");
        var url = new URL(row.imageURL)
        // choose whether to make an http or https request
        let client = (url.protocol=="https:") ? https : http
        const request = client.get(row.imageURL, function(response:any) {
            // file save
            response.pipe(file);
            console.log('file saved')
            let filePath = "file.jpg";
            let params = {
                Bucket: 'bucket-name',
                Body : fs.createReadStream(filePath),
                Key : "filename.jpg"
            };
            // upload to s3
            s3.upload(params, function (err: any, data: any) {
                //handle error
                if (err) {
                  console.log("Error", err);
                }

                //success
                if (data) {
                  console.log("Uploaded in:", data.Location);
                  row.imageURL = data.Location
                  writeData.push(row)
                //   console.log(writeData)
                }
              });
        });
    }))
    .on('end', () => {
        console.log("done reading")
        const csvWriter = createCsvWriter({
            path: 'out.csv',
            header: [
                {id: 'id', title: 'some title'}
            ]
        });
        csvWriter
            .writeRecords(writeData)
            .then(() => console.log("The CSV file was written successfully"))
    })
}

Going by the log statements that I have added, done reading and The CSV file was written successfully is printed by before file saved. My understanding was that the end event is called after the data event, so I am unsure of where I am going wrong.

Thank you for reading!

Upvotes: 0

Views: 2178

Answers (1)

jfriend00
jfriend00

Reputation: 707318

I'm not sure if this is part of the problem or not, but you've got an extra set of parens in this part of the code. Change this:

.on('data', ((row: any) => {
     .....
})).on('end', () => {

to this:

.on('data', (row: any) => {
     .....
}).on('end', () => {

And, if the event handlers are set up properly, your .on('data', ...) event handler gets called before the .on('end', ....) for the same stream. If you put this:

console.log('at start of data event handler');

as the first line in that event handler, you will see it get called first.

But, your data event handler uses multiple asynchronous calls and nothing you have in your code makes the end event wait for all your processing to be done in the data event handler. So, since that processing takes awhile, it's natural that the end event would occur before you're done running all that asynchronous code on the data event.


In addition, if you ever can have more than one data event (which one normally would), you're going to have multiple data events in flight at the same time and since you're using a fixed filename, they will probably be overwriting each other.


The usual way to solve something like this is to to stream.pause() to pause the readstream at the start of the data event processing and then when all your asynchronous stuff is done, you can then stream.resume() to let it start going again.

You will need to get the right stream in order to pause and resume. You could do something like this:

let stream = fs.createReadStream('filename.csv')
 .pipe(csv());

stream.on('data', ((row: any) => {
   stream.pause();
   ....
});

Then, way inside your s3.upload() callback, you can call stream.resume(). You will also need much, much better error handling that you have or things will just get stuck if you get an error.

It also looks like you have other concurrency issues too where you call:

response.pipe(file);

And you then attempt to use the file without actually waiting for that .pipe() operation to be done (which is also asynchronous). Overall, this whole logic really needs a major cleanup. I don't understand what exactly you're trying to do in all the different steps to know how to write a totally clean and simpler version.

Upvotes: 2

Related Questions