Wichert Akkerman
Wichert Akkerman

Reputation: 5318

asyncio.subprocess always blocks

I need to write some async code which runs a subprocess as part of its tasks. Even though I am using asyncio.subprocess my code is still blocking. My server looks like this:

import asyncio
import asyncio.subprocess
import websockets

async def handler(websocket, path):
    while True:
        data = await websocket.recv()
        print('I received a message')
        player = await asyncio.create_subprocess_exec(
            'sleep', '5',
            stdin=asyncio.subprocess.DEVNULL,
            stdout=asyncio.subprocess.DEVNULL,
            stderr=asyncio.subprocess.DEVNULL)
        await player.wait()
        print('Finished waiting')

server = websockets.serve(handler, '0.0.0.0', '8000')
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()

And a very basic client:

import asyncio import websockets

async def client():
    async with websockets.connect('ws://localhost:8000') as websocket:
        for i in range(5):
            await websocket.send('message')
            await asyncio.sleep(0.5)

asyncio.get_event_loop().run_until_complete(client())

I would expect the output to look like this:

I received a message
I received a message
I received a message
I received a message
I received a message
Finished waiting
Finished waiting
Finished waiting
Finished waiting
Finished waiting

But instead I get this:

I received a message
Finished waiting
I received a message
Finished waiting
I received a message
Finished waiting
I received a message
Finished waiting
I received a message
Finished waiting

With a 5 second wait after each "I received a message" line.

Upvotes: 2

Views: 1513

Answers (2)

Udi
Udi

Reputation: 30512

The line await player.wait() does not block other async operations, but waits for 5 seconds!

If you don't want to wait for the response, try using ensure_future() instead:

# add:
async def wait_for_player(player, path):
    print("Waiting...", path)
    await player.wait()
    print("Done", path)

# and replace await player.wait() with:
asyncio.ensure_future(wait_for_player(player, path))

You can actually also move create_subprocess_exec() to wait_for_player().


To see your code is not blocking see try these:

Client:

import asyncio

import websockets


async def client(n):
    async with websockets.connect('ws://localhost:8000/{}/'.format(n)) as websocket:
        print(n, "start")
        for i in range(5):
            print(n, i)
            await websocket.send('message')
            await asyncio.sleep(0.5)
    print(n, "done")


tasks = [client(i) for i in range(5)]
asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))

Server:

import asyncio
import asyncio.subprocess

import random
import websockets


async def handler(websocket, path):
    try:
        while True:
            data = await websocket.recv()
            pause = random.randint(1, 5)
            print('I received a message', path, "Pausing:", pause)
            player = await asyncio.create_subprocess_exec(
                'sleep', str(pause),
                stdin=asyncio.subprocess.DEVNULL,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL)
            await player.wait()
            print('Finished waiting', path)
    except websockets.ConnectionClosed:
        print("Connection closed!", path)


server = websockets.serve(handler, '0.0.0.0', '8000')
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()

Upvotes: 2

Amin Etesamian
Amin Etesamian

Reputation: 3699

Your ws server seems ok. Actually it is your client that is blocking. If you want to test the async behavior of your server, You need to make asynchronous requests. The for loop in your client blocks the thread. So remove it and instead, use asyncio.gather to run your client() method 5 times asynchronously

import asyncio
import websockets

async def client():
    async with websockets.connect('ws://localhost:8000') as websocket:
        await websocket.send('message')
        await asyncio.sleep(0.5)


tasks = asyncio.gather(*[client() for i in range(5)])
asyncio.get_event_loop().run_until_complete(tasks)

Upvotes: 0

Related Questions