ronrest
ronrest

Reputation: 1222

Websockets, handling user input asynchronously. Getting ConnectionClosedError 1006

I am trying to run a websocket client that continuously takes user input from the terminal, and sends it to the server.

It works perfectly fine as long as the person is entering content on a regular basis. But the following error message is thrown if the user fails to send a message for more than about 20 seconds.

websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason

I suspect it might be because the input() function that waits for user input is a blocking function. Perhaps this blocks the client from pinging the server?

I have tried to wrap the input() function that waits for user input inside of an asynchronous function. But I still get the same issue.

How do I get around this issue?

Here is the code i have used.

Code

CLIENT

import asyncio
import websockets

async def producer():
    return input("Enter something: ")

async def echo_loop():
    uri = f"wss://192.168.1.2:5432"
    async with websockets.connect(uri, ssl=None) as websocket:
        while True:
            # Get user input
            # msg = input("Enter something: ")
            msg = await producer()

            # Send message to the server
            await websocket.send(msg)
            print(f"> {msg}")

            # Get feedback from server
            feedback = await websocket.recv()
            print(f"< {feedback}")


asyncio.get_event_loop().run_until_complete(echo_loop())
asyncio.get_event_loop().run_forever()

SERVER

import asyncio
import websockets

async def echo(websocket, path):
    async for msg in websocket:
        # Send Feedback of message to client
        feedback = f"RECEIVED: {msg}!"
        await websocket.send(feedback)
        print(f"> {feedback}")

start_server = websockets.serve(echo, host=None, port=5432, ssl=None)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Additional Notes:

As per the documentation FAQ I am aware that the ConnectionClosedError: code = 1006 can sometimes occur when there is a network issue, eg if:

However, this is not an internet or proxy issue that is causing it.

I have also looked at other somewhat related questions on stack-overflow, none of which answer my problem:

Logs

Here are the logs on the client, from the moment some input was made, to just before the Exception was raised:

client > Frame(fin=True, opcode=1, data=b'entering some text here after a long delay', rsv1=False, rsv2=False, rsv3=False)
> entering some text here after a long delay
client - event = data_received(<10 bytes>)
client < Frame(fin=True, opcode=9, data=b'\xb4Y\xbd\xa2', rsv1=False, rsv2=False, rsv3=False)
client - received ping, sending pong: b459bda2
client > Frame(fin=True, opcode=10, data=b'\xb4Y\xbd\xa2', rsv1=False, rsv2=False, rsv3=False)
client > Frame(fin=True, opcode=9, data=b'\xbe\x16\x06\xe7', rsv1=False, rsv2=False, rsv3=False)
client - event = connection_lost([Errno 32] Broken pipe)
client - state = CLOSED
client x code = 1006, reason = [no reason]
client - aborted pending ping: be1606e7
client ! failing CLOSED WebSocket connection with code 1006
Error in data transfer

And here is the log from the server.

server > Frame(fin=True, opcode=9, data=b'\xb4Y\xbd\xa2', rsv1=False, rsv2=False, rsv3=False)
server ! timed out waiting for pong
server ! failing OPEN WebSocket connection with code 1011
server - state = CLOSING
server > Frame(fin=True, opcode=8, data=b'\x03\xf3', rsv1=False, rsv2=False, rsv3=False)
server x half-closing TCP connection
server ! timed out waiting for TCP close
server x closing TCP connection
server - event = connection_lost(None)
server - state = CLOSED
server x code = 1006, reason = [no reason]
server - aborted pending ping: b459bda2
Error in connection handler

I suspect the server ! timed out waiting for pong line is the important one. My understanding so far is that the client was not able to send a ping to the server, due to waiting or user input.

Upvotes: 3

Views: 2107

Answers (1)

ronrest
ronrest

Reputation: 1222

I figured it out myself.

The problem was indeed the input() function on the client. As I suspected, it was blocking the program from doing anything about the pings coming in.

I initially thought that wrapping it inside of an async coroutine function would have been enough. But it turns out it still doesn't run asynchronously if it has IO-blocking code within it.

I got around this by wrapping the input() function inside of asyncio's run_in_executor() method, which makes it run asynchronously.

Here is what I changed to make it work:

async def producer():
    return await asyncio.get_event_loop().run_in_executor(None, lambda: input("Enter something: "))

Upvotes: 4

Related Questions