Artiom  Kozyrev
Artiom Kozyrev

Reputation: 3836

Why asyncio.sleep freezes whole Task (which has websocket inside) in running state when used with aiohttp websockets?

Today I found very strange problem with asyncio or aiohttp.

I wrote very simple server and client which use Websockets. When server gets connection from client, it creates two tasks, one task listens to data from client, another one send data to client.

If client decides to finish session, it sends close to server, listen_on_socket (server) Task finishes fine, but send_to_socket (server) Task became frozen if it contains asyncio.sleep inside of the Task. I can not even cancel the frozen task.

What's the reason of the problem and how can I handle it?

I have the following aiohttp server code as example:

from aiohttp import web, WSMsgType
import asyncio


async def send_to_socket(ws: web.WebSocketResponse):
    """helper func which send messages to socket"""
    for i in range(10):
        try:
            if ws.closed:
                break
            else:
                await ws.send_str(f"I am super socket server-{i} !!!")
        except Exception as ex:
            print(ex)
            break
        # remove await asyncio.sleep(0.5) and it works !
        print("| send_to_socket | St sleeping")
        await asyncio.sleep(0.5)
        print("| send_to_socket | Stopped sleeping")  # you will not get the message
    if not ws.closed:
        await ws.send_str("close")

    print("| send_to_socket | Finished sending")


async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
    """helper func which Listen messages to socket"""
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
                send_task.cancel()
                print(send_task.cancelled(), send_task.done(), send_task)
                break
        elif msg.type == WSMsgType.ERROR:
            print(f'ws connection closed with exception {ws.exception()}')
    print("* listen_on_socket * Finished listening")


async def websocket_handler(req:  web.Request) -> web.WebSocketResponse:
    """Socket aiohttp handler"""
    ws = web.WebSocketResponse()
    print(f"Handler | Started websocket: {id(ws)}")
    await ws.prepare(req)
    t = asyncio.create_task(send_to_socket(ws))
    await asyncio.gather(listen_on_socket(ws, t), t)
    print("Handler | websocket connection closed")
    return ws

if __name__ == '__main__':
    app = web.Application()
    app.router.add_get("/socket", websocket_handler)
    web.run_app(app, host="0.0.0.0", port=9999)

I have the following aiohttp client code as example:

from aiohttp import ClientSession
import aiohttp
import asyncio


async def client():
    n = 3
    async with ClientSession() as session:
        async with session.ws_connect('http://localhost:9999/socket') as ws:
            async for msg in ws:
                if n == 0:
                    await ws.send_str("close")
                    break
                if msg.type == aiohttp.WSMsgType.TEXT:
                    if msg.data == "close":
                        await ws.close()
                        break
                    else:
                        print(msg.data)
                        n -= 1
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break
    print("Client stopped")

if __name__ == '__main__':
    asyncio.run(client())

Upvotes: 0

Views: 1278

Answers (2)

Artiom  Kozyrev
Artiom Kozyrev

Reputation: 3836

From aiohttp documentation: Reading from the WebSocket (await ws.receive()) must only be done inside the request handler task; however, writing (ws.send_str(...)) to the WebSocket, closing (await ws.close()) and canceling the handler task may be delegated to other tasks.

Hereby the mistake was that I created reading from ws task in listen_on_socket.

Solution. Changes only in server, client is the same:

from aiohttp import web, WSMsgType
import asyncio


async def send_to_socket(ws: web.WebSocketResponse):
    """helper func which send messages to socket"""
    for i in range(4):
        try:
            if ws.closed:
                break
            else:
                await ws.send_str(f"I am super socket server-{i} !!!")
        except Exception as ex:
            print(ex)
            break
        await asyncio.sleep(1.5)
    if not ws.closed:
        await ws.send_str("close")

    print(f"| send_to_socket | Finished sending {id(ws)}")


async def websocket_handler(req:  web.Request) -> web.WebSocketResponse:
    """Socket aiohttp handler"""
    ws = web.WebSocketResponse()
    print(f"Handler | Started websocket: {id(ws)}")
    await ws.prepare(req)
    # create another task for writing
    asyncio.create_task(send_to_socket(ws))
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
                break
        elif msg.type == WSMsgType.ERROR:
            print(f'ws connection closed with exception {ws.exception()}')
    print(f"Connection {id(ws)} is finished")
    return ws

if __name__ == '__main__':
    app = web.Application()
    app.router.add_get("/socket", websocket_handler)
    web.run_app(app, host="0.0.0.0", port=9999)

Upvotes: 0

NobbyNobbs
NobbyNobbs

Reputation: 1434

It isn't freezes, just your cancellation and logging a bit incorrect, you should await for cancelled task

async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
    """helper func which Listen messages to socket"""
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
                send_task.cancel()
                try:
                    await send_task
                except asyncio.CancelledError:
                    print("send task cancelled")
                print(send_task.cancelled(), send_task.done(), send_task)
                break
        elif msg.type == WSMsgType.ERROR:
            print(f'ws connection closed with exception {ws.exception()}')
    print("* listen_on_socket * Finished listening")

Also there should be set return_exceptions=True in the gather call inside the websocket_handler to prevent exception propagation.

You could just wrap all the function body with try-finally block and ensure it finishes fine (sure just for debugging, not in final implementation).

Upvotes: 1

Related Questions