Prashanth kumar
Prashanth kumar

Reputation: 985

Lambda - Python - CSV to NDJSON - Fails to dump huge files

I am working on a lambda that converts a CSV file landed in Bucket-A(source) to NDJSON and move it to Bucket-B(destination)

The below logic works fine as expected for small files, but my CSV files are expected to be over 200 MB, and some about 2.5GB, and this logic times out even when lambda is set to max time-out.

I was looking at a post, that was talking about using lambda tmp space to directly write/append the info to a file, which can be uploaded to S3, but the max size of the tmp space is about ~500 MB

Thank you for reading through.
Any help to tackle this is greatly appreciated.

import boto3
import ndjson
import csv
from datetime import datetime, timezone
from io import StringIO
import os

def lambda_handler(event, context):
    errMsg = None
    target_resp_list = []
    l_utcdatetime = datetime.utcnow()
    l_timestamp = l_utcdatetime.strftime('%Y%m%d%H%M%S')
    
    s3 = boto3.resource('s3')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
    
    for record in event["Records"]:
        
        # Source bucket and key of the new file landed
        source_bucket = record["s3"]["bucket"]["name"]
        source_key = record["s3"]["object"]["key"]
        source_file_name = source_key.split("/")[-1].split(".")[0]
        
        bucket = s3.Bucket(source_bucket)
        obj = bucket.Object(key=source_key)
        response = obj.get()
        records = StringIO(response['Body'].read().decode())

        # loop through the csv records and add it to the response list, while adding the snapshot_datetime to each record
        for row in csv.DictReader(records):
            row['source_snapshot_datetime'] = f'{l_utcdatetime}'
            target_resp_list.append(row)

        # The below attributes are used in copying the ndjson file to the destination bucket
        l_target_bucket = os.getenv("TargetBucket")
        l_target_prefix = os.getenv("TargetPrefix")
        l_target_key = f"{l_target_prefix}/{source_file_name}_{l_timestamp}.ndjson"

        # Moving the ndjson file to Snowflake staging bucket
        try:
            s3_client.put_object(Body=ndjson.dumps(target_resp_list), 
                Bucket=l_target_bucket, 
                Key=l_target_key
            )
            print("File moved to destination bucket.")
        except Exception as ex1:
            errMsg = f"Error while copying the file from source to destination bucket - {ex1}"
        
        # Raise exception in case of copy fail
        if errMsg is not None:
            raise Exception(errMsg)

Upvotes: 0

Views: 276

Answers (2)

Prashanth kumar
Prashanth kumar

Reputation: 985

Leaving this here for someone that might come looking later.

I presume the issue is with ndjson.dumps taking huge time to convert the list and push to S3, hence what I did is used a counter to chunk the source records - 50K each, and then calling dumpChunkToS3(), which is basically the logic to dump to S3.
An extra conditional statement is required, as the number of rows/records is not going to be divided by 50K even(in my case at leaset)

# loop through the csv records and add it to the response list, while adding the snapshot_datetime to the record
for row in csv.DictReader(records):
    row['source_snapshot_datetime'] = f'{l_utcdatetime}'
    rowscnt += 1
    target_resp_list.append(row)
    if rowscnt == 50000:
        chunk_id += 1
        respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)
        rowscnt = 0
        del target_resp_list[:]

if rowscnt > 0:
    chunk_id += 1
    respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)

Upvotes: 0

Traycho Ivanov
Traycho Ivanov

Reputation: 3207

Lambda could run up to 15 minutes of per execution. I would recommend first check what is your worst case scenario for processing file first locally. If you expect huge files try to bump lambda memory to max feasible value fulfilling your requirements.

Hints:

  • Try to compress files, CSV files which are GBs compressed are reduced to megabytes, text could be compressed a lot.
  • Try split the work in advance, if this huge file could be split by one lambda and processed by other you will not care much of execution timo-out.

Upvotes: 1

Related Questions