youngminz
youngminz

Reputation: 1434

asyncio tcp socket: How to cancel asyncio.sleep() when socket is closed?

I am currently implementing the TCP socket protocol. The protocol requires sending heartbeat messages every five minutes. I am implementing a protocol using asyncio in Python. The source code below is a program that connects to localhost:8889, sends hello, and disconnects the socket after 1 second. In this case, the connection is disconnected after one second (if this actually happens, the network is down or the server is disconnected). The problem is that the send_heartbeat function waits 5 minutes without knowing that the socket is down. I would like to cancel the coroutine immediately instead of waiting 5 minutes when the socket is disconnected. What's the best way to do it?

import asyncio


async def run(host: str, port: int):
    while True:
        try:
            reader, writer = await asyncio.open_connection(host, port)

        except OSError as e:
            print('connection failed:', e)
            await asyncio.sleep(0.5)
            continue

        await asyncio.wait([
            handle_stream(reader, writer),
            send_heartbeat(reader, writer),
        ], return_when=asyncio.FIRST_COMPLETED)  # will stop after 1 second

        writer.close()  # close socket after 1 second
        await writer.wait_closed()


async def handle_stream(reader, writer):
    writer.write(b'hello\n')  # will success because socket is alive
    await writer.drain()

    await asyncio.sleep(1)


async def send_heartbeat(reader, writer):
    while True:
        await asyncio.sleep(300)

        heartbeat_message = b'heartbeat\n'
        writer.write(heartbeat_message)  # will fail because socket is already closed after 1 second
        await writer.drain()


if __name__ == '__main__':
    asyncio.run(run('127.0.0.1', 8889))

Upvotes: 0

Views: 647

Answers (1)

user4815162342
user4815162342

Reputation: 154906

You can cancel the sleep by canceling a task that executes it. Creating send_heartbeat as a separate task ensures that it runs in parallel to handle_stream while you await the latter:

async def run(host: str, port: int):
    while True:
        ...
        heartbeat = asyncio.create_task(send_heartbeat(reader, writer))
        try:
            await handle_stream(reader, writer)
        finally:
            heartbeat.cancel()
            writer.close()
        await writer.wait_closed()

BTW, since you're awaiting writer.drain() inside handle_stream, there is no guarantee that handle_stream will always complete in 1 second. This might be a place where you might want to avoid the drain, or you can use asyncio.wait_for when awaiting handle_stream(...).

Upvotes: 2

Related Questions