Vikram Sodhan
Vikram Sodhan

Reputation: 21

Converting JSON to Parquet in a NodeJS lambda to write into S3

I am running an AWS Lambda function with NodeJS as the language. This lambda receives some JSON input that I need to transform into Parquet format before writing it to S3.

Currently, I'm using the parquetjs library to convert the JSON data to Parquet. However, the parquet.ParquetWriter.openFile function creates a local file, and I need to write directly to S3.

Ideally, I would like to convert the JSON data to Parquet in memory and then send it directly to S3. Since this Lambda function will be heavily used, I need to optimize it for high loads and avoid writing to a local disk.

What would be the best practice for achieving this?

Thank you in advance for your help!

Upvotes: 1

Views: 4477

Answers (3)

Vikram Sodhan
Vikram Sodhan

Reputation: 21

To build on the answer provided by @oieduardorabelo here is the code I used to handle the conversion and upload to s3.

const parquet = require('parquetjs');
const fs = require('fs');
const crypto = require('crypto');
const aws = require('aws-sdk');

const s3 = new aws.S3();

const uuid = () => {
  crypto.randomBytes(16).toString('hex');
};

const convertToParquetAndPutInS3 = async (bucket, folder, schema, obj) => {
  const fileName = `/tmp/${uuid()}`;
  try {
    const parquetSchema = new parquet.ParquetSchema(schema);

    const writer = await parquet.ParquetWriter.openFile(parquetSchema, fileName);
    await writer.appendRow(obj);
    await writer.close();

    const fileStream = fs.createReadStream(fileName);

    const params = {
      Bucket: bucket,
      Key: folder,
      Body: fileStream,
    };

    s3.putObject(params).promise();
  } finally {
    await deleteFilePromise(fileName);
  }
};

const deleteFilePromise = (filePath) => {
  return new Promise((resolve, reject) => {
    fs.unlink(filePath, (err) => {
      if (err) reject(err);
      else resolve();
    });
  });
};

Upvotes: 1

ns15
ns15

Reputation: 8704

You can try the DuckDB library to convert JSON to parquet and then write parquet directly to S3. All this is done in a single statement.

var duckdb = require('duckdb');
var db = new duckdb.Database(':memory:');

// mysource.json is the json file   
var query = "INSTALL httpfs;LOAD httpfs;SET s3_region='ap-south-1';SET s3_access_key_id='***';SET s3_secret_access_key='***';COPY 'mysource.json' TO 's3://parq-write-bucket/my_data.parq' (FORMAT 'parquet');"
db.all(query, function(err, res) {
  if (err) throw err
  console.log(res)
});

Here the mysource.json is the json file on the lambda disk. If you want to avoid writing to lambda disk altogether then you could first insert the json to a duckdb table and then export the table as parquet to s3.


Some useful links: JSON with duckdb, duckdb s3 export, duckdb to s3.

Upvotes: 1

oieduardorabelo
oieduardorabelo

Reputation: 2985

Using out-of-the-box dependencies will require writing the JSON-to-Parquet conversion to a local file. Then, you can stream-read the file and upload to S3.

AWS Lambda includes a 512 MB temporary file system (/tmp) for your code and doesn't cause any performance hits. Depending on the size of your payload you may need to increase it, up to 10 GB.

Pseudo-code (1):

const fs = require("fs");
const bodyRequest = {
  id: 1,
  payload: [
    {
      payloadid: 1,
      name: "name-1",
      value: "value-1",
    },
    {
      payloadid: 2,
      name: "name-2",
      value: "value-2",
    },
  ],
};
const schema = new parquet.ParquetSchema({
  id: { type: "UTF8" },
  payload: {
    repeated: true,
    fields: {
      payloadid: { type: "UTF8" },
      name: { type: "UTF8" },
      value: { type: "UTF8" },
    },
  },
});
const writer = await parquet.ParquetWriter.openFile(
  schema,
  "/tmp/example.parquet"
);

await writer.appendRow({
  id: bodyRequest["id"],
  payload: bodyRequest["payload"],
});

await writer.close();

const fileStream = fs.createReadStream("/tmp/example.parquet");
const s3Key = "2022/07/07/example.parquet";
try {
  const params = { Bucket: "bucket", Key: s3Key, Body: fileStream };
  const result = await s3.putObject(params).promise();
  fs.unlink("/tmp/example.parquet", function (err) {
    if (err) {
      console.error(err);
    }
    console.log("File has been Deleted");
  });
  console.log(result);
} catch (e) {
  console.error(e);
}

Depending on the throughput of requests, you may need an SQS between services to perform the batch transformation. For example:

Request -> Lambda -> S3/json -> S3 Notification -> SQS and batch 50 messages -> Lambda transformation -> S3/parquet


Another solution would be using AWS Glue to transform S3 objects from JSON to Parquet: https://hkdemircan.medium.com/how-can-we-json-css-files-transform-to-parquet-through-aws-glue-465773b43dad

The flow would be: Request -> Lambda -> S3/json and S3/json <- Glue Crawler -> S3/parquet. You can do that via scheduled (every X minutes) or trigger it via S3 events.

Upvotes: 2

Related Questions