Reputation: 1222
I am trying to run a websocket
client that continuously takes user input from the terminal, and sends it to the server.
It works perfectly fine as long as the person is entering content on a regular basis. But the following error message is thrown if the user fails to send a message for more than about 20 seconds.
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason
I suspect it might be because the input()
function that waits for user input is a blocking function. Perhaps this blocks the client from pinging the server?
I have tried to wrap the input()
function that waits for user input inside of an asynchronous function. But I still get the same issue.
How do I get around this issue?
Here is the code i have used.
CLIENT
import asyncio
import websockets
async def producer():
return input("Enter something: ")
async def echo_loop():
uri = f"wss://192.168.1.2:5432"
async with websockets.connect(uri, ssl=None) as websocket:
while True:
# Get user input
# msg = input("Enter something: ")
msg = await producer()
# Send message to the server
await websocket.send(msg)
print(f"> {msg}")
# Get feedback from server
feedback = await websocket.recv()
print(f"< {feedback}")
asyncio.get_event_loop().run_until_complete(echo_loop())
asyncio.get_event_loop().run_forever()
SERVER
import asyncio
import websockets
async def echo(websocket, path):
async for msg in websocket:
# Send Feedback of message to client
feedback = f"RECEIVED: {msg}!"
await websocket.send(feedback)
print(f"> {feedback}")
start_server = websockets.serve(echo, host=None, port=5432, ssl=None)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
As per the documentation FAQ I am aware that the ConnectionClosedError: code = 1006
can sometimes occur when there is a network issue, eg if:
However, this is not an internet or proxy issue that is causing it.
I have also looked at other somewhat related questions on stack-overflow, none of which answer my problem:
Here are the logs on the client, from the moment some input was made, to just before the Exception was raised:
client > Frame(fin=True, opcode=1, data=b'entering some text here after a long delay', rsv1=False, rsv2=False, rsv3=False)
> entering some text here after a long delay
client - event = data_received(<10 bytes>)
client < Frame(fin=True, opcode=9, data=b'\xb4Y\xbd\xa2', rsv1=False, rsv2=False, rsv3=False)
client - received ping, sending pong: b459bda2
client > Frame(fin=True, opcode=10, data=b'\xb4Y\xbd\xa2', rsv1=False, rsv2=False, rsv3=False)
client > Frame(fin=True, opcode=9, data=b'\xbe\x16\x06\xe7', rsv1=False, rsv2=False, rsv3=False)
client - event = connection_lost([Errno 32] Broken pipe)
client - state = CLOSED
client x code = 1006, reason = [no reason]
client - aborted pending ping: be1606e7
client ! failing CLOSED WebSocket connection with code 1006
Error in data transfer
And here is the log from the server.
server > Frame(fin=True, opcode=9, data=b'\xb4Y\xbd\xa2', rsv1=False, rsv2=False, rsv3=False)
server ! timed out waiting for pong
server ! failing OPEN WebSocket connection with code 1011
server - state = CLOSING
server > Frame(fin=True, opcode=8, data=b'\x03\xf3', rsv1=False, rsv2=False, rsv3=False)
server x half-closing TCP connection
server ! timed out waiting for TCP close
server x closing TCP connection
server - event = connection_lost(None)
server - state = CLOSED
server x code = 1006, reason = [no reason]
server - aborted pending ping: b459bda2
Error in connection handler
I suspect the server ! timed out waiting for pong
line is the important one. My understanding so far is that the client was not able to send a ping to the server, due to waiting or user input.
Upvotes: 3
Views: 2107
Reputation: 1222
I figured it out myself.
The problem was indeed the input()
function on the client. As I suspected, it was blocking the program from doing anything about the pings coming in.
I initially thought that wrapping it inside of an async
coroutine function would have been enough. But it turns out it still doesn't run asynchronously if it has IO-blocking code within it.
I got around this by wrapping the input()
function inside of asyncio
's run_in_executor()
method, which makes it run asynchronously.
Here is what I changed to make it work:
async def producer():
return await asyncio.get_event_loop().run_in_executor(None, lambda: input("Enter something: "))
Upvotes: 4