Reputation: 119
I'm building an api using fastapi
with postgres
and ormar
for ORM (currently using docker-compose
to develop locally). Routes are working fine, but I'm having trouble adding JWT user authorization using the natively async ormar
to confirm users in the db (I'm following this example but looking to replace the fake_db
with calls to the actual db in the authenticate
function).
I've tried to use asyncio
to convert the database call to a synchronous operation in order to execute it as part of authenticate
but keep getting errors suggesting I'm not adding the task to the right thread or pool.
Here's the snippet causing the issue (specifically asyncio.run
is where it happens)
import asyncio
from app.db import User
def authenticate(email: str, password: str) -> Optional[User]:
user = await User.objects.filter(email=email).first()
asyncio.run(user)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
and here's the stack trace
web_1 | Future exception was never retrieved
web_1 | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1 | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1 | INFO: 192.168.176.2:50038 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1 | ERROR: Exception in ASGI application
web_1 | Traceback (most recent call last):
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1 | result = await app(self.scope, self.receive, self.send)
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1 | return await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1 | await super().__call__(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1 | await self.middleware_stack(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1 | await self.app(scope, receive, _send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1 | await self.app(scope, receive, sender)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1 | await route.handle(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1 | await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1 | response = await func(request)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1 | raw_response = await run_endpoint_function(
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1 | return await run_in_threadpool(dependant.call, **values)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1 | return await loop.run_in_executor(None, func, *args)
web_1 | File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1 | result = self.fn(*self.args, **self.kwargs)
web_1 | File "/app/./app/api/routes/auth.py", line 19, in login
web_1 | user = authenticate(email=form_data.username, password=form_data.password)
web_1 | File "/app/./app/core/auth.py", line 35, in authenticate
web_1 | asyncio.run(user)
web_1 | File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1 | return loop.run_until_complete(main)
web_1 | File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1 | return future.result()
web_1 | File "/app/./app/core/auth.py", line 19, in get_user
web_1 | return await User.objects.get(email=email) #User.objects.filter(email=email).first()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 929, in get
web_1 | return await self.filter(*args, **kwargs).get()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 945, in get
web_1 | rows = await self.database.fetch_all(expr)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 142, in fetch_all
web_1 | return await connection.fetch_all(query, values)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 248, in __aexit__
web_1 | await self._connection.release()
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1 | self._connection = await self._database._pool.release(self._connection)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
web_1 | return await asyncio.shield(ch.release(timeout))
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
web_1 | raise ex
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
web_1 | await self._con.reset(timeout=budget)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
web_1 | await self.execute(reset_query, timeout=timeout)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
web_1 | return await self._protocol.query(query, timeout)
web_1 | File "asyncpg/protocol/protocol.pyx", line 321, in query
web_1 | File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
web_1 | asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
How can I run the call to the data base synchronously?
UPDATE DECEMBER 28, 2021
Changing to below code as suggested by @AndreaTedeschi
async def get_user_by_email(email: str) -> Optional[User]:
return await User.objects.get_or_none(email=email)
def authenticate(email: str, password: str) -> Optional[User]:
user = get_user_by_email(email=email)
asyncio.run(user)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
I get an error about not attaching to the right loop. Full stack trace:
web_1 | Future exception was never retrieved
web_1 | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1 | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1 | INFO: 172.27.0.3:42414 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1 | ERROR: Exception in ASGI application
web_1 | Traceback (most recent call last):
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1 | result = await app(self.scope, self.receive, self.send)
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1 | return await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1 | await super().__call__(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1 | await self.middleware_stack(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1 | await self.app(scope, receive, _send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1 | await self.app(scope, receive, sender)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1 | await route.handle(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1 | await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1 | response = await func(request)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1 | raw_response = await run_endpoint_function(
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1 | return await run_in_threadpool(dependant.call, **values)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1 | return await loop.run_in_executor(None, func, *args)
web_1 | File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1 | result = self.fn(*self.args, **self.kwargs)
web_1 | File "/app/./app/api/routes/auth.py", line 19, in login
web_1 | user = authenticate(email=form_data.username, password=form_data.password)
web_1 | File "/app/./app/core/auth.py", line 36, in authenticate
web_1 | asyncio.run(user, debug=True)
web_1 | File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1 | return loop.run_until_complete(main)
web_1 | File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1 | return future.result()
web_1 | File "/app/./app/core/auth.py", line 20, in get_user_by_email
web_1 | return await User.objects.get_or_none(email=email) #User.objects.filter(email=email).first()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 912, in get_or_none
web_1 | return await self.get(*args, **kwargs)
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 933, in get
web_1 | return await self.filter(*args, **kwargs).get()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 953, in get
web_1 | rows = await self.database.fetch_all(expr)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 148, in fetch_all
web_1 | return await connection.fetch_all(query, values)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 264, in __aexit__
web_1 | await self._connection.release()
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1 | self._connection = await self._database._pool.release(self._connection)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 867, in release
web_1 | return await asyncio.shield(ch.release(timeout))
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 224, in release
web_1 | raise ex
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 214, in release
web_1 | await self._con.reset(timeout=budget)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1367, in reset
web_1 | await self.execute(reset_query, timeout=timeout)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 318, in execute
web_1 | return await self._protocol.query(query, timeout)
web_1 | File "asyncpg/protocol/protocol.pyx", line 338, in query
web_1 | RuntimeError: Task <Task pending name='Task-34' coro=<PoolConnectionHolder.release() running at /usr/local/lib/python3.8/site-packages/asyncpg/pool.py:214> cb=[shield.<locals>._inner_done_callback() at /usr/local/lib/python3.8/asyncio/tasks.py:885] created at /usr/local/lib/python3.8/asyncio/tasks.py:878> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop
So some progress, but still not quite there. How would I resolve this?
Clearly I'm not an expert on async python so I appreciate the help!
Upvotes: 1
Views: 1176
Reputation: 43
The orm you are using is async. So methods to query the database are coroutine functions and have to be awaited.
You can not use an await espression in a normal function. You need to use the prefix async
in order to make it a coroutine function:
async def get_by_username(username: str) -> Optional[User]:
return await User.objects.get_or_none(username=username)
async def authenticate(username: str, password: str) -> Optional[User]:
user = await get_by_username(username)
if not user:
return None
# do stuff about password verificazione
return username
Upvotes: 0