Reputation: 985
I am working on a lambda that converts a CSV file landed in Bucket-A(source) to NDJSON and move it to Bucket-B(destination)
The below logic works fine as expected for small files, but my CSV files are expected to be over 200 MB, and some about 2.5GB, and this logic times out even when lambda is set to max time-out.
I was looking at a post, that was talking about using lambda tmp space to directly write/append the info to a file, which can be uploaded to S3, but the max size of the tmp space is about ~500 MB
Thank you for reading through.
Any help to tackle this is greatly appreciated.
import boto3
import ndjson
import csv
from datetime import datetime, timezone
from io import StringIO
import os
def lambda_handler(event, context):
errMsg = None
target_resp_list = []
l_utcdatetime = datetime.utcnow()
l_timestamp = l_utcdatetime.strftime('%Y%m%d%H%M%S')
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
for record in event["Records"]:
# Source bucket and key of the new file landed
source_bucket = record["s3"]["bucket"]["name"]
source_key = record["s3"]["object"]["key"]
source_file_name = source_key.split("/")[-1].split(".")[0]
bucket = s3.Bucket(source_bucket)
obj = bucket.Object(key=source_key)
response = obj.get()
records = StringIO(response['Body'].read().decode())
# loop through the csv records and add it to the response list, while adding the snapshot_datetime to each record
for row in csv.DictReader(records):
row['source_snapshot_datetime'] = f'{l_utcdatetime}'
target_resp_list.append(row)
# The below attributes are used in copying the ndjson file to the destination bucket
l_target_bucket = os.getenv("TargetBucket")
l_target_prefix = os.getenv("TargetPrefix")
l_target_key = f"{l_target_prefix}/{source_file_name}_{l_timestamp}.ndjson"
# Moving the ndjson file to Snowflake staging bucket
try:
s3_client.put_object(Body=ndjson.dumps(target_resp_list),
Bucket=l_target_bucket,
Key=l_target_key
)
print("File moved to destination bucket.")
except Exception as ex1:
errMsg = f"Error while copying the file from source to destination bucket - {ex1}"
# Raise exception in case of copy fail
if errMsg is not None:
raise Exception(errMsg)
Upvotes: 0
Views: 276
Reputation: 985
Leaving this here for someone that might come looking later.
I presume the issue is with ndjson.dumps taking huge time to convert the list and push to S3, hence what I did is used a counter to chunk the source records - 50K each, and then calling dumpChunkToS3(), which is basically the logic to dump to S3.
An extra conditional statement is required, as the number of rows/records is not going to be divided by 50K even(in my case at leaset)
# loop through the csv records and add it to the response list, while adding the snapshot_datetime to the record
for row in csv.DictReader(records):
row['source_snapshot_datetime'] = f'{l_utcdatetime}'
rowscnt += 1
target_resp_list.append(row)
if rowscnt == 50000:
chunk_id += 1
respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)
rowscnt = 0
del target_resp_list[:]
if rowscnt > 0:
chunk_id += 1
respMsg = dumpChunkToS3(s3_client, source_file_name, target_resp_list, chunk_id)
Upvotes: 0
Reputation: 3207
Lambda could run up to 15 minutes of per execution. I would recommend first check what is your worst case scenario for processing file first locally. If you expect huge files try to bump lambda memory to max feasible value fulfilling your requirements.
Hints:
Upvotes: 1