Reputation: 89
I'm trying to create a code for AWS Lambda to convert csv to parquet. I can do that using Pyarrow but it is too large in size(~200 MB uncompressed) due to which I can't use it in deployment package for Lambda. I'm trying to write the parquet file to s3 bucket directly using BytesIO library.
Below is my lambda function code:
import json
import boto3
import pandas as pd
from io import BytesIO
def lambda_handler():
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket = 'mybucket',
Prefix = 'subfolder/'
)
files = get_object_keys(response)
for file in files:
obj = s3.get_object(Bucket='mybucket', Key=file)
df = pd.read_csv(obj['Body'], sep='|')
buf = BytesIO()
df.to_parquet(buf, engine='fastparquet', index=False, compression='snappy')
buf.seek(0)
key = f"output/{file.split('/')[1].split('.')[0]}.parquet"
s3.put_object(Bucket='mybucket', Body=buf.getvalue(), Key=key)
def get_object_keys(response):
files = []
for content in response['Contents']:
if content['Key'].endswith('.csv'):
files.append(content['Key'])
return files
lambda_handler()
When i use 'fastparquet' as engine in dataframe.to_parquet(), i get the following error :
Traceback (most recent call last):
File ".\lambda_function.py", line 77, in <module>
lambda_handler()
File ".\lambda_function.py", line 64, in lambda_handler
df.to_parquet(buf, engine='fastparquet', index=False, compression='snappy')
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\util\_decorators.py", line 214, in wrapper
return func(*args, **kwargs)
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py", line 2116, in to_parquet
**kwargs,
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\parquet.py", line 264, in to_parquet
**kwargs,
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\parquet.py", line 185, in write
**kwargs,
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\writer.py", line 880, in write
compression, open_with, has_nulls, append)
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\writer.py", line 734, in write_simple
with open_with(fn, mode) as f:
File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\util.py", line 42, in default_open
return open(f, mode)
TypeError: expected str, bytes or os.PathLike object, not _io.BytesIO
Does anyone know how to fix this?
Upvotes: 8
Views: 2331
Reputation: 96
This error was resolved by using pyarrow as writing engine.
sample code.
buffer = io.BytesIO()
df.to_parquet(buffer, engine="pyarrow", index = False)
s3_resource = boto3.resource('s3')
s3_resouce.Object(bucketname, path_withfilename).put(body = buffer.getvalue())
for reading parquet file in python 3.6 I used fastparquet, but for writing pyarrow engine seems to work.
Upvotes: 3