Parse csv file from S3 using Lambda and Node Stream

I'm trying to code a lambda that triggers an s3 bucket and gets a CSV file when it is uploaded, and parse this file.

I'm using: Node 14x

This is the code:

import { S3Event } from 'aws-lambda';
import { S3 } from 'aws-sdk';
import * as csv from 'fast-csv';

const s3 = new S3({ apiVersion: 'latest' });

export async function hello(event: S3Event, context, cb) {
  event.Records.forEach(async (record) => {
    const bucket = record.s3.bucket.name;
    const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));

    const params: S3.GetObjectRequest = {
      Bucket: bucket,
      Key: key,
    };

    const stream = s3.getObject(params).createReadStream();

    console.log({ stream });

    csv.parseStream(stream, {
      headers: true
    }).on('data', data => { console.log(data); })
      .on('error', error => console.error(error))
      .on('end', (rowCount: number) => console.log(`Parsed ${rowCount} rows`));

    console.log('processo 01 acabou!');
  });
}

When I execute this lambda I'm not receiving anything. In console.log(stream) I'm receiving a PassTrought object...

stream: PassThrough {
    _readableState: ReadableState {
      objectMode: false,
      highWaterMark: 16384,
      buffer: BufferList { head: null, tail: null, length: 0 },
      length: 0,
      pipes: [],
      flowing: null,
      ended: false,
      endEmitted: false,
      reading: false,
      sync: false,
      needReadable: false,
      emittedReadable: false,
      readableListening: false,
      resumeScheduled: false,
      errorEmitted: false,
      emitClose: true,
      autoDestroy: true,
      destroyed: false,
      errored: null,
      closed: false,
      closeEmitted: false,
      defaultEncoding: 'utf8',
      awaitDrainWriters: null,
      multiAwaitDrain: false,
      readingMore: false,
      dataEmitted: false,
      decoder: null,
      encoding: null,
      [Symbol(kPaused)]: null
    },
    _events: [Object: null prototype] { prefinish: [Function: prefinish] },
    _eventsCount: 1,
    _maxListeners: undefined,
    _writableState: WritableState {
      objectMode: false,
      highWaterMark: 16384,
      finalCalled: false,
      needDrain: false,
      ending: false,
      ended: false,
      finished: false,
      destroyed: false,
      decodeStrings: true,
      defaultEncoding: 'utf8',
      length: 0,
      writing: false,
      corked: 0,
      sync: true,
      bufferProcessing: false,
      onwrite: [Function: bound onwrite],
      writecb: null,
      writelen: 0,
      afterWriteTickInfo: null,
      buffered: [],
      bufferedIndex: 0,
      allBuffers: true,
      allNoop: true,
      pendingcb: 0,
      prefinished: false,
      errorEmitted: false,
      emitClose: true,
      autoDestroy: true,
      errored: null,
      closed: false
    },
    allowHalfOpen: true,
    [Symbol(kCapture)]: false,
    [Symbol(kTransformState)]: {
      afterTransform: [Function: bound afterTransform],
      needTransform: false,
      transforming: false,
      writecb: null,
      writechunk: null,
      writeencoding: null
    }
  }
}

I have a picture from my CloudWatch

enter image description here

Can anyone help me, and tell me what I'm doing wrong?

Upvotes: 0

Views: 2175

Answers (1)

jarmod
jarmod

Reputation: 78663

The issue with your code is that it's not correctly dealing with the asynchronous nature of JavaScript. Specifically, your code is exiting before any asynchronous activity has completed.

Your Lambda function is async so it should return a promise that is ultimately settled (fulfilled or rejected) when your processing of the S3 object(s) has completed. This allows the AWS Lambda runtime environment to await completion.

For example:

exports.handler =  async function(event, context) {
  const promises = event.Records.map((record) => {
    const Bucket = record.s3.bucket.name;
    const Key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
    const params = { Bucket, Key };
    const stream = s3.getObject(params).createReadStream();

    return new Promise(function(resolve, reject) {
      csv.parseStream(stream, {
        headers: true
      }).on('data', (data) => {
        console.log(data);
      }).on('error', (error) => {
        console.error(error);
        reject(error);
      }).on('end', (rows) => {
        console.log(`Parsed ${rows} rows`);
        resolve(rows);
      });
    });
  });

  return Promise.all(promises);
}

Upvotes: 1

Related Questions