user9532692
user9532692

Reputation: 706

Python Async Error: await only allowed in async function

I am trying read files asynchronously, but I am running into an error saying “await only allowed in async function.” From my understanding, this error occurs when the await keyword is used inside of a function that was not marked as async. However, as shown in the code below, I mark the read_blob function as async

I don’t have any issues running this in .ipynb Jupiter notebook (in which I run the code cell by cell sequentially), but when I try to run it in VSCode in a .py file, it throws an error.

Here is my code:

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz$',
                  csv_regex='.+\.csv(\.gz)?$',
                  parquet_regex='.+\.parquet$',
                  regex_string=None,
                  **kwargs):

        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        
        except ResourceNotFoundError:
            return 0


blob_sas_url = "https://proan.blob"
acc = AsyncContainerClient.from_container_url(blob_sas_url)

test_dirs = ["models1/model.parquet", "models2/model.parquet", "models3/model.parquet"]
res = await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

error_img

Upvotes: 1

Views: 1815

Answers (1)

Ron Serruya
Ron Serruya

Reputation: 4426

The entry point to an async program should be asyncio.run, you should wrap your code in an async method then call it

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz$',
                  csv_regex='.+\.csv(\.gz)?$',
                  parquet_regex='.+\.parquet$',
                  regex_string=None,
                  **kwargs):

        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        
        except ResourceNotFoundError:
            return 0


async def main():
    blob_sas_url = "https://proan.blob"
    acc = AsyncContainerClient.from_container_url(blob_sas_url)

    test_dirs = ["models1/model.parquet", "models2/model.parquet", 
    "models3/model.parquet"]
    return await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

asyncio.run(main())

Upvotes: 3

Related Questions