Reputation: 21
I have a data flow in a FastAPI application where a potentially very large HTTP PUT body is being uploaded, and I need to stream this to Google Cloud Storage. I'm using the gcloud.aio.storage
library for interacting with Google Cloud Storage, as it has asyncio
support (it's based on aiohttp
). I'm trying to stream the body through, so that I wouldn't need to have the whole body stored in memory at once.
The problem that I'm having is that FastAPI's request.stream()
returns an async
generator, while the storage client expects either raw data as bytes or string, or something based on io.IOBase
class.
So, if I call the below:
async def upload(request: fastapi.Request, storage_client: gcloud.aio.storage.Storage):
upload_stream = request.stream()
await storage_client.upload(
"my_bucket",
"my_object",
upload_stream
)
it fails with TypeError: unsupported upload type: "<class 'async_generator'>"
.
How would I get a io.IOBase
-derived stream object that will be accepted, without reading the whole request body at once?
Upvotes: 1
Views: 234
Reputation: 34551
To upload a file in chunks to Google Cloud Storage and actually benefit from using FastAPI/Starlette's request.stream()
—which would allow you to receive a file/request body in chunks in your FastAPI backend; thus, avoiding loading the entire file/request body into memory (see the "Update" section of this answer for more details)—you should rather use resumable uploads. Note that, as described in this comment, one had to pay for multiple resumable upload operations in the past. However, this might have changed since then, and you should thus check for any recent updates concerning that matter.
Below is an example as given in this article, but adapted to a FastAPI application. The chunk size is set to 256 KB, as suggested in th official documentation for multiple chunk upload. Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage. The example is implemented using Google's offical Python Client for Google Cloud Storage (see the source code of the python-storage
package along with the given samples—python-storage
is also part of google-cloud-python
package). For resumable uploads, Google's google-resumable-media-python
package is used, which has asyncio
support, but is still in development.
Since the example below does not use the asynchronous method for resumable uploads, this could cause blocking of the main thread until a chunk upload operation is completed. Thus, one might use one might have a look at this answer and the solutions provided in it, when running blocking I/O-bound or CPU-bound operations within async def
endpoints (e.g., await run_in_threadpool(s.write, chunk)
). However, when the async for
block, in the example below, requests the next chunk from the asynchronous iterator (i.e., request.stream()
), the surrounding coroutine will be suspended and pass function control back to the event loop
, thus allowing other tasks in the event loop
to run (regardless), until that operation is completed. Hence, even though the event loop
might get blocked when the synchronous s.write(chunk)
is called, the async for
block would still give up time for other tasks in the event loop to run.
gcs.py
from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from google.cloud import storage
class GCSObjectStreamUpload(object):
def __init__(
self,
client: storage.Client,
bucket_name: str,
blob_name: str,
chunk_size: int=256 * 1024
):
self._client = client
self._bucket = self._client.bucket(bucket_name)
self._blob = self._bucket.blob(blob_name)
self._buffer = b''
self._buffer_size = 0
self._chunk_size = chunk_size
self._read = 0
self._transport = AuthorizedSession(
credentials=self._client._credentials
)
self._request = None # type: requests.ResumableUpload
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, *_):
if exc_type is None:
self.stop()
def start(self):
url = (
f'https://storage.googleapis.com/upload/storage/v1/b/'
f'{self._bucket.name}/o?uploadType=resumable'
)
self._request = requests.ResumableUpload(
upload_url=url, chunk_size=self._chunk_size
)
self._request.initiate(
transport=self._transport,
content_type='application/octet-stream',
stream=self,
stream_final=False,
metadata={'name': self._blob.name},
)
def stop(self):
self._request.transmit_next_chunk(self._transport)
def write(self, data: bytes) -> int:
data_len = len(data)
self._buffer_size += data_len
self._buffer += data
del data
while self._buffer_size >= self._chunk_size:
try:
self._request.transmit_next_chunk(self._transport)
except common.InvalidResponse:
self._request.recover(self._transport)
return data_len
def read(self, chunk_size: int) -> bytes:
to_read = min(chunk_size, self._buffer_size)
memview = memoryview(self._buffer)
self._buffer = memview[to_read:].tobytes()
self._read += to_read
self._buffer_size -= to_read
return memview[:to_read].tobytes()
def tell(self) -> int:
return self._read
app.py
from fastapi import FastAPI, Request, HTTPException
from gcs import GCSObjectStreamUpload
from google.cloud import storage
app = FastAPI()
@app.post('/upload')
async def upload(request: Request):
try:
client = storage.Client()
with GCSObjectStreamUpload(client=client, bucket='test-bucket', blob='test-blob') as s:
async for chunk in request.stream():
s.write(chunk)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
return {"message": f"File successfuly uploaded"}
Other approaches for uploading files to Google Cloud Storage can be found in this answer.
Upvotes: 0