Reputation: 53
I've been struggling to find a solution for my problem, I hope I've come to the right place.
I have a django rest framework API which connect to a postgresql db and I run bots on my own API in order to do stuff. Here is my code :
def get_or_create_eventloop():
"""Get the eventLoop only one time (create it if does not exist)"""
try:
return asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
My DB class which use asyncpg to connect / create a pool :
Class DB():
def __init__(self,loop):
self.pool = loop.run_until_complete(self.connect_to_db())
def connect_to_db():
return await asyncpg.create_pool(host="host",
database="database",
user="username",
password="pwd",
port=5432)
My API class :
Class Api(APIView):
#create a loop event since its not the main thread
loop = get_or_create_eventloop()
nest_asyncio.apply() #to avoid the <loop already running> problem
#init my DB pool directly so I wont have to connect each time
db_object = DB(loop)
def post(self,request):
... #I want to be able to call "do_something()"
async def do_something(self):
...
I have my bots running and sending post/get request to my django api via aiohttp.
The problem I'm facing is :
This is what I tried so far without success :
def post(self,request):
self.loop.run_until_complete(self.do_something())
This create :
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
which I understand, we are trying to call the event loop from another thread possibly
I also tried to use asyng_to_sync from DJANGO :
@async_to_sync
async def post(..):
resp = await self.do_something()
The problem here is when doing async_to_sync it CREATES a new event loop for the thread, therefore I won't be able to access my DB POOL edit : cf https://github.com/MagicStack/asyncpg/issues/293 for that (I would love to implement something like that but can't find a way)
Here is a quick example of one of my bot (basic stuff) :
import asyncio
from aiohttp import ClientSession
async def send_req(url, session):
async with session.post(url=url) as resp:
return await resp.text()
async def run(r):
url = "http://localhost:8080/"
tasks = []
async with ClientSession() as session:
for i in range(r):
task = asyncio.asyncio.create_task(send_req(url, session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
print(responses)
if __name__ == '__main__':
asyncio.run(main())
Thank you in advance
Upvotes: 2
Views: 1442
Reputation: 53
After days of looking for an answer, I found the solution for my problem. I just used the package psycopg3 instead of asyncpg (now I can put @async_to_sync to my post function and it works)
Upvotes: 2