dazww
dazww

Reputation: 53

Asyncio & Asyncpg & aiohttp one loop event for pool connection

I've been struggling to find a solution for my problem, I hope I've come to the right place.

I have a django rest framework API which connect to a postgresql db and I run bots on my own API in order to do stuff. Here is my code :

def get_or_create_eventloop():
    """Get the eventLoop only one time (create it if does not exist)"""
    try:
        return asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return asyncio.get_event_loop()

My DB class which use asyncpg to connect / create a pool :

Class DB():
    def __init__(self,loop):
        self.pool = loop.run_until_complete(self.connect_to_db())

    def connect_to_db():
        return await asyncpg.create_pool(host="host",
                                     database="database",
                                     user="username",
                                     password="pwd",
                                     port=5432)

My API class :

Class Api(APIView):
    #create a loop event since its not the main thread
    loop = get_or_create_eventloop()
    nest_asyncio.apply() #to avoid the <loop already running>  problem

    #init my DB pool directly so I wont have to connect each time
    db_object = DB(loop)
    
    
    def post(self,request):
        ... #I want to be able to call "do_something()"
    
    async def do_something(self):
        ...

I have my bots running and sending post/get request to my django api via aiohttp.

The problem I'm facing is :

This is what I tried so far without success :

def post(self,request):
    self.loop.run_until_complete(self.do_something())

This create :


RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

which I understand, we are trying to call the event loop from another thread possibly

I also tried to use asyng_to_sync from DJANGO :

@async_to_sync
async def post(..):
      resp = await self.do_something()

The problem here is when doing async_to_sync it CREATES a new event loop for the thread, therefore I won't be able to access my DB POOL edit : cf https://github.com/MagicStack/asyncpg/issues/293 for that (I would love to implement something like that but can't find a way)

Here is a quick example of one of my bot (basic stuff) :


import asyncio
from aiohttp import ClientSession

    async def send_req(url, session):
        async with session.post(url=url) as resp:
            return await resp.text()

async def run(r):
    url = "http://localhost:8080/"
    tasks = []

    async with ClientSession() as session:
        for i in range(r):
            task = asyncio.asyncio.create_task(send_req(url, session))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        print(responses)


if __name__ == '__main__':
    asyncio.run(main())


Thank you in advance

Upvotes: 2

Views: 1442

Answers (1)

dazww
dazww

Reputation: 53

After days of looking for an answer, I found the solution for my problem. I just used the package psycopg3 instead of asyncpg (now I can put @async_to_sync to my post function and it works)

Upvotes: 2

Related Questions