ChrisC
ChrisC

Reputation: 21

How to upload file in chunks to Google Cloud Storage using FastAPI's request.stream()?

I have a data flow in a FastAPI application where a potentially very large HTTP PUT body is being uploaded, and I need to stream this to Google Cloud Storage. I'm using the gcloud.aio.storage library for interacting with Google Cloud Storage, as it has asyncio support (it's based on aiohttp). I'm trying to stream the body through, so that I wouldn't need to have the whole body stored in memory at once.

The problem that I'm having is that FastAPI's request.stream() returns an async generator, while the storage client expects either raw data as bytes or string, or something based on io.IOBase class.

So, if I call the below:

async def upload(request: fastapi.Request, storage_client: gcloud.aio.storage.Storage):
    upload_stream = request.stream()
    await storage_client.upload(
        "my_bucket",
        "my_object",
        upload_stream
    )

it fails with TypeError: unsupported upload type: "<class 'async_generator'>".

How would I get a io.IOBase-derived stream object that will be accepted, without reading the whole request body at once?

Upvotes: 1

Views: 234

Answers (1)

Chris
Chris

Reputation: 34551

To upload a file in chunks to Google Cloud Storage and actually benefit from using FastAPI/Starlette's request.stream()—which would allow you to receive a file/request body in chunks in your FastAPI backend; thus, avoiding loading the entire file/request body into memory (see the "Update" section of this answer for more details)—you should rather use resumable uploads. Note that, as described in this comment, one had to pay for multiple resumable upload operations in the past. However, this might have changed since then, and you should thus check for any recent updates concerning that matter.

Below is an example as given in this article, but adapted to a FastAPI application. The chunk size is set to 256 KB, as suggested in th official documentation for multiple chunk upload. Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage. The example is implemented using Google's offical Python Client for Google Cloud Storage (see the source code of the python-storage package along with the given samplespython-storage is also part of google-cloud-python package). For resumable uploads, Google's google-resumable-media-python package is used, which has asyncio support, but is still in development.

Since the example below does not use the asynchronous method for resumable uploads, this could cause blocking of the main thread until a chunk upload operation is completed. Thus, one might use one might have a look at this answer and the solutions provided in it, when running blocking I/O-bound or CPU-bound operations within async def endpoints (e.g., await run_in_threadpool(s.write, chunk)). However, when the async for block, in the example below, requests the next chunk from the asynchronous iterator (i.e., request.stream()), the surrounding coroutine will be suspended and pass function control back to the event loop, thus allowing other tasks in the event loop to run (regardless), until that operation is completed. Hence, even though the event loop might get blocked when the synchronous s.write(chunk) is called, the async for block would still give up time for other tasks in the event loop to run.

gcs.py

from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from google.cloud import storage


class GCSObjectStreamUpload(object):
    def __init__(
            self, 
            client: storage.Client,
            bucket_name: str,
            blob_name: str,
            chunk_size: int=256 * 1024
        ):
        self._client = client
        self._bucket = self._client.bucket(bucket_name)
        self._blob = self._bucket.blob(blob_name)

        self._buffer = b''
        self._buffer_size = 0
        self._chunk_size = chunk_size
        self._read = 0

        self._transport = AuthorizedSession(
            credentials=self._client._credentials
        )
        self._request = None  # type: requests.ResumableUpload


    def __enter__(self):
        self.start()
        return self


    def __exit__(self, exc_type, *_):
        if exc_type is None:
            self.stop()


    def start(self):
        url = (
            f'https://storage.googleapis.com/upload/storage/v1/b/'
            f'{self._bucket.name}/o?uploadType=resumable'
        )
        self._request = requests.ResumableUpload(
            upload_url=url, chunk_size=self._chunk_size
        )
        self._request.initiate(
            transport=self._transport,
            content_type='application/octet-stream',
            stream=self,
            stream_final=False,
            metadata={'name': self._blob.name},
        )


    def stop(self):
        self._request.transmit_next_chunk(self._transport)


    def write(self, data: bytes) -> int:
        data_len = len(data)
        self._buffer_size += data_len
        self._buffer += data
        del data
        while self._buffer_size >= self._chunk_size:
            try:
                self._request.transmit_next_chunk(self._transport)
            except common.InvalidResponse:
                self._request.recover(self._transport)
        return data_len


    def read(self, chunk_size: int) -> bytes:
        to_read = min(chunk_size, self._buffer_size)
        memview = memoryview(self._buffer)
        self._buffer = memview[to_read:].tobytes()
        self._read += to_read
        self._buffer_size -= to_read
        return memview[:to_read].tobytes()


    def tell(self) -> int:
        return self._read

app.py

from fastapi import FastAPI, Request, HTTPException
from gcs import GCSObjectStreamUpload
from google.cloud import storage

app = FastAPI()

        
@app.post('/upload')
async def upload(request: Request):
    try:
        client = storage.Client()
        with GCSObjectStreamUpload(client=client, bucket='test-bucket', blob='test-blob') as s:
            async for chunk in request.stream():
                s.write(chunk)
    except Exception:
        raise HTTPException(status_code=500, detail='Something went wrong')
    
    return {"message": f"File successfuly uploaded"}

Other approaches for uploading files to Google Cloud Storage can be found in this answer.

Upvotes: 0

Related Questions