Oron Bendavid
Oron Bendavid

Reputation: 1533

@aws-sdk/lib-storage to Stream JSON from MongoDB to S3 with JSONStream.stringify()

I'm trying to Stream JSON from MongoDB to S3 with the new version of @aws-sdk/lib-storage:

"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",

Try #1: It seems that I'm not using JSONStream.stringify() correctly:

import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

Error #1:

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #2, using the variable jsonStream:

  const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      },
    });

Error #2:

ReferenceError: ReadableStream is not defined at Object.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

Try #3: use stream.PassThrough:

    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
  try{

    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    return;
  }
};

Error #3:

'dest.on is not a function at Stream.pipe (internal/streams/legacy.js:30:8'

Try #4: mongodb.stream({transform: doc => JSON.stringify...}) instead of JSONStream:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('{}')
      .limit(5)
      .stream({ transform: doc => JSON.stringify(doc) + '\n' });
  
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
  
    await upload.done(); 
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};

Error: #4:

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object. Received type object at Function.from (buffer.js:305:9) at getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) at processTicksAndRejections (internal/process/task_queues.js:94:5) at Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) at Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) at async Promise.all (index 0) at Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) at Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

Try #5: using stream.PassThrough() and return pass to pipe:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
    readStream.pipe(uploadStreamFile());
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


const stream = require('stream');

export const uploadStreamFile = async() => {
  try{
    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    await upload.done();
    return pass;
  }
  catch(err){
    log.error('pawoooooo', err);
    return;
  }
};

Error #5:

TypeError: dest.on is not a function at Cursor.pipe (_stream_readable.js:680:8)

Upvotes: 7

Views: 3808

Answers (2)

Oron Bendavid
Oron Bendavid

Reputation: 1533

I found additional solution using stream.PassThrough, using JSONStream will stream array of objects instead of one after the other:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const passThroughStream = new stream.PassThrough();
    const readStream = db.collection(collectionName)
      .find('{}')
      .stream();

    readStream.on('end', () => passThroughStream.end());

    readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
    await uploadStreamFile('benda_mongo.json', passThroughStream);
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


export const uploadStreamFile = async(fileName, stream) => {
  try{
    log.info('start uploading file', fileName);
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: `${fileName}`,
        Body: stream,
      },
    });

    const res = await upload.done();
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    log.error(err);
    return;
  }
};

Upvotes: 2

jccampanero
jccampanero

Reputation: 53421

After reviewing your error stack traces, probably the problem has to do with the fact that the MongoDB driver provides a cursor in object mode whereas the Body parameter of Upload requires a traditional stream, suitable for be processed by Buffer in this case.

Taking your original code as reference, you can try providing a Transform stream for dealing with both requirements.

Please, consider for instance the following code:

import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    // We are creating here a Transform to adapt both sides
    const toJSONTransform = new Transform({
      writableObjectMode: true,
      transform(chunk, encoding, callback) {
        this.push(JSON.stringify(chunk) + '\n');
        callback();  
      }  
    });

    readStream.pipe(toJSONTransform);
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: toJSONTransform,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

In the code, in toJSONTransform we are defining the writable part of the stream as object mode; in contrast, the readable part will be suitable for being read from the S3 Upload method... at least, I hope so.

Regarding the second error you reported, the one related with dest.on, I initially thought, and I wrote you about the possibility, that the error was motivated because in uploadStreamFile you are returning a Promise, not a stream, and you are passing that Promise to the pipe method, which requires a stream, basically that you returned the wrong variable. But I didn't realize that you are trying passing the PassThrough stream as a param to the Upload method: please, be aware that this stream doesn't contain any information because you are not passing any information to it, the contents of the readable stream obtained from the MongoDB query are never passed to the callback nor the Upload itself.

Upvotes: 2

Related Questions