Ariel
Ariel

Reputation: 3611

How to upload File in FastAPI, then to Amazon S3 and finally process it?

I have a FastAPI endpoint that receives a file, uploads it to s3, and then processes it. Everything works fine except for the processing, that fails with this message:

  File "/usr/local/lib/python3.9/site-packages/starlette/datastructures.py", line 441, in read
    return self.file.read(size)
  File "/usr/local/lib/python3.9/tempfile.py", line 735, in read
    return self._file.read(*args)
ValueError: I/O operation on closed file.

My simplified code looks like this:

async def process(file: UploadFile):
    reader = csv.reader(iterdecode(file.file.read(), "utf-8"), dialect="excel")  # This fails!
    datarows = []
    for row in reader:
        datarows.append(row)
    return datarows

How can I read the contents of the uploaded file?

UPDATE

I managed to isolate the problem a bit more. Here's my simplified endpoint:

import boto3
from loguru import logger
from botocore.exceptions import ClientError


UPLOAD = True

@router.post("/")
async def upload(file: UploadFile = File(...)):
    if UPLOAD:
        # Upload the file
        s3_client = boto3.client("s3", endpoint_url="http://localstack:4566")
        try:
            s3_client.upload_fileobj(file.file, "local", "myfile.txt")
        except ClientError as e:
            logger.error(e)
    contents = await file.read()
    return JSONResponse({"message": "Success!"})

If UPLOAD is True, I get the error. If it's not, everything works fine. It seems boto3 is closing the file after uploading it. Is there any way I can reopen the file? Or send a copy to upload_fileobj?

Upvotes: 9

Views: 28700

Answers (2)

Chris
Chris

Reputation: 34109

FastAPI's (actually Starlette's) UploadFile (see Starlette's documentation as well) uses Python's SpooledTemporaryFile, a "file stored in memory up to a maximum size limit, and after passing this limit it will be stored in disk.". It "operates exactly as TemporaryFile", which "is destroyed as soon as it is closed (including an implicit close when the object is garbage collected)". Hence, it seems that once the contents of the file have been read by boto3, the file gets closed, which, in turn, causes the file to be deleted.

Option 1

If the server supports it, you could read the file contents—using contents = file.file.read(), as shown in this answer (or for async reading/writing see here)—and then upload these contents (i.e.,bytes) to your server directly.

Otherwise, you can again read the contents and then move the file's reference point at the beginning of the file. In a file there is an internal "cursor" (or "file pointer") denoting the position from which the file contents will be read (or written). When calling read() reads all the way to the end of the buffer, leaving zero bytes beyond the cursor. Thus, one could also use the seek() method to set the current position of the cursor to 0 (i.e., rewinding the cursor to the start of the file); thus, allowing you to pass the file object (i.e., upload_fileobj(file.file) see this answer) after reading the file contents.

As per FastAPI's documentation:

seek(offset): Goes to the byte position offset (int) in the file.

  • E.g., await myfile.seek(0) would go to the start of the file.
  • This is especially useful if you run await myfile.read() once and then need to read the contents again.

Example

from fastapi import File, UploadFile, HTTPException

@app.post('/')
def upload(file: UploadFile = File(...)):
    try:
        contents = file.file.read()
        file.file.seek(0)
        # Upload the file to to your S3 service
        s3_client.upload_fileobj(file.file, 'local', 'myfile.txt')
    except Exception:
        raise HTTPException(status_code=500, detail='Something went wrong')
    finally:
        file.file.close()

   print(contents)  # Handle file contents as desired
   return {"filename": file.filename}

Option 2

Copy the contents of the file into a NamedTemporaryFile, which, unlike TemporaryFile, "has a visible name in the file system" that "can be used to open the file" (that name can be retrieved from the .name attribute ). Additionally, it can remain accesible after it is closed, by setting the delete argument to False; thus, allowing the file to reopen when needed. Once you are done with it, you can delete it using the os.remove() or os.unlink() method. Below is a working example (inspired by this answer):

from fastapi import FastAPI, File, UploadFile, HTTPException
from tempfile import NamedTemporaryFile
import os

app = FastAPI()

@app.post("/upload")
def upload_file(file: UploadFile = File(...)):
    temp = NamedTemporaryFile(delete=False)
    try:
        try:
            contents = file.file.read()
            with temp as f:
                f.write(contents);
        except Exception:
            raise HTTPException(status_code=500, detail='Error on uploading the file')
        finally:
            file.file.close()
            
        # Upload the file to your S3 service using `temp.name`
        s3_client.upload_file(temp.name, 'local', 'myfile.txt')
        
    except Exception:
        raise HTTPException(status_code=500, detail='Something went wrong')
    finally:
        #temp.close()  # the `with` statement above takes care of closing the file
        os.remove(temp.name)  # Delete temp file
    
    print(contents)  # Handle file contents as desired
    return {"filename": file.filename}

Option 3

You could even keep the bytes in an in-memory buffer BytesIO, use it to upload the contents to the S3 bucket, and finally close it ("The buffer is discarded when the close() method is called."). Remember to call seek(0) method to reset the cursor back to the beginning of the file after you finish writing to the BytesIO stream.

contents = file.file.read()
temp_file = io.BytesIO()
temp_file.write(contents)
temp_file.seek(0)
s3_client.upload_fileobj(temp_file, "local", "myfile.txt")
temp_file.close()

Upvotes: 19

Bastien B
Bastien B

Reputation: 1313

From FastAPI ImportFile:

Import File and UploadFile from fastapi:

from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/files/")
async def create_file(file: bytes = File(...)):
    return {"file_size": len(file)}


@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
    return {"filename": file.filename}

From FastAPI UploadFile:

For example, inside of an async path operation function you can get the contents with:

contents = await myfile.read()

with your code you should have something like this:

async def process(file: UploadFile = File(...)):
    content = await file.read()
    reader = csv.reader(iterdecode(content, "utf-8"), dialect="excel")
    datarows = []
    for row in reader:
        datarows.append(row)
    return datarows

Upvotes: 2

Related Questions