svedel
svedel

Reputation: 119

synchronous execution of database (postgres) query using asyncio not working together with ormar ORM (fastapi app)

I'm building an api using fastapi with postgres and ormar for ORM (currently using docker-compose to develop locally). Routes are working fine, but I'm having trouble adding JWT user authorization using the natively async ormar to confirm users in the db (I'm following this example but looking to replace the fake_db with calls to the actual db in the authenticate function).

I've tried to use asyncio to convert the database call to a synchronous operation in order to execute it as part of authenticate but keep getting errors suggesting I'm not adding the task to the right thread or pool.

Here's the snippet causing the issue (specifically asyncio.run is where it happens)

import asyncio 
from app.db import User


def authenticate(email: str, password: str) -> Optional[User]:

    user = await User.objects.filter(email=email).first()
    asyncio.run(user)

    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

and here's the stack trace

web_1      | Future exception was never retrieved
web_1      | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1      | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1      | INFO:     192.168.176.2:50038 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1      | ERROR:    Exception in ASGI application
web_1      | Traceback (most recent call last):
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1      |     result = await app(self.scope, self.receive, self.send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1      |     return await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1      |     await super().__call__(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1      |     await self.middleware_stack(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1      |     await self.app(scope, receive, _send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1      |     await self.app(scope, receive, sender)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1      |     await route.handle(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1      |     await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1      |     response = await func(request)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1      |     raw_response = await run_endpoint_function(
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1      |     return await run_in_threadpool(dependant.call, **values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1      |     return await loop.run_in_executor(None, func, *args)
web_1      |   File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1      |     result = self.fn(*self.args, **self.kwargs)
web_1      |   File "/app/./app/api/routes/auth.py", line 19, in login
web_1      |     user = authenticate(email=form_data.username, password=form_data.password)
web_1      |   File "/app/./app/core/auth.py", line 35, in authenticate
web_1      |     asyncio.run(user)
web_1      |   File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1      |     return loop.run_until_complete(main)
web_1      |   File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1      |     return future.result()
web_1      |   File "/app/./app/core/auth.py", line 19, in get_user
web_1      |     return await User.objects.get(email=email) #User.objects.filter(email=email).first()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 929, in get
web_1      |     return await self.filter(*args, **kwargs).get()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 945, in get
web_1      |     rows = await self.database.fetch_all(expr)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 142, in fetch_all
web_1      |     return await connection.fetch_all(query, values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 248, in __aexit__
web_1      |     await self._connection.release()
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1      |     self._connection = await self._database._pool.release(self._connection)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
web_1      |     return await asyncio.shield(ch.release(timeout))
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
web_1      |     raise ex
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
web_1      |     await self._con.reset(timeout=budget)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
web_1      |     await self.execute(reset_query, timeout=timeout)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
web_1      |     return await self._protocol.query(query, timeout)
web_1      |   File "asyncpg/protocol/protocol.pyx", line 321, in query
web_1      |   File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
web_1      | asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

How can I run the call to the data base synchronously?


UPDATE DECEMBER 28, 2021

Changing to below code as suggested by @AndreaTedeschi

async def get_user_by_email(email: str) -> Optional[User]:
    return await User.objects.get_or_none(email=email) 

def authenticate(email: str, password: str) -> Optional[User]:
    
    user = get_user_by_email(email=email)
    asyncio.run(user)

    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

I get an error about not attaching to the right loop. Full stack trace:

web_1      | Future exception was never retrieved
web_1      | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1      | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1      | INFO:     172.27.0.3:42414 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1      | ERROR:    Exception in ASGI application
web_1      | Traceback (most recent call last):
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1      |     result = await app(self.scope, self.receive, self.send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1      |     return await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1      |     await super().__call__(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1      |     await self.middleware_stack(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1      |     await self.app(scope, receive, _send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1      |     await self.app(scope, receive, sender)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1      |     await route.handle(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1      |     await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1      |     response = await func(request)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1      |     raw_response = await run_endpoint_function(
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1      |     return await run_in_threadpool(dependant.call, **values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1      |     return await loop.run_in_executor(None, func, *args)
web_1      |   File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1      |     result = self.fn(*self.args, **self.kwargs)
web_1      |   File "/app/./app/api/routes/auth.py", line 19, in login
web_1      |     user = authenticate(email=form_data.username, password=form_data.password)
web_1      |   File "/app/./app/core/auth.py", line 36, in authenticate
web_1      |     asyncio.run(user, debug=True)
web_1      |   File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1      |     return loop.run_until_complete(main)
web_1      |   File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1      |     return future.result()
web_1      |   File "/app/./app/core/auth.py", line 20, in get_user_by_email
web_1      |     return await User.objects.get_or_none(email=email) #User.objects.filter(email=email).first()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 912, in get_or_none
web_1      |     return await self.get(*args, **kwargs)
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 933, in get
web_1      |     return await self.filter(*args, **kwargs).get()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 953, in get
web_1      |     rows = await self.database.fetch_all(expr)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 148, in fetch_all
web_1      |     return await connection.fetch_all(query, values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 264, in __aexit__
web_1      |     await self._connection.release()
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1      |     self._connection = await self._database._pool.release(self._connection)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 867, in release
web_1      |     return await asyncio.shield(ch.release(timeout))
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 224, in release
web_1      |     raise ex
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 214, in release
web_1      |     await self._con.reset(timeout=budget)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1367, in reset
web_1      |     await self.execute(reset_query, timeout=timeout)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 318, in execute
web_1      |     return await self._protocol.query(query, timeout)
web_1      |   File "asyncpg/protocol/protocol.pyx", line 338, in query
web_1      | RuntimeError: Task <Task pending name='Task-34' coro=<PoolConnectionHolder.release() running at /usr/local/lib/python3.8/site-packages/asyncpg/pool.py:214> cb=[shield.<locals>._inner_done_callback() at /usr/local/lib/python3.8/asyncio/tasks.py:885] created at /usr/local/lib/python3.8/asyncio/tasks.py:878> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop

So some progress, but still not quite there. How would I resolve this?

Clearly I'm not an expert on async python so I appreciate the help!

Upvotes: 1

Views: 1176

Answers (1)

Andrea Tedeschi
Andrea Tedeschi

Reputation: 43

The orm you are using is async. So methods to query the database are coroutine functions and have to be awaited.

You can not use an await espression in a normal function. You need to use the prefix async in order to make it a coroutine function:

async def get_by_username(username: str) -> Optional[User]:
    return await User.objects.get_or_none(username=username)

async def authenticate(username: str, password: str) -> Optional[User]:
    user = await get_by_username(username)
    if not user:
        return None
    # do stuff about password verificazione
    return username

Upvotes: 0

Related Questions