Reputation: 555
We have a simple web server written in Python's http.server
. One of its endpoints serves streaming data - a series of events, with new events sent as they occur. We've successfully set up this endpoint so that it delivers the data as expected. However, if the client disconnects, the server does not detect this, and will continue delivering data down a connection no one's listening to indefinitely.
The relevant bit of code:
import json
import queue
import http.server
from common.log import log
logger = log.get_logger()
class RemoteHTTPHandler(http.server.BaseHTTPRequestHandler):
...
def __stream_events(self, start_after_event_id: int) -> None:
"""Stream events starting after the given ID and continuing as new events become available"""
# Get a queue of events, which will include all existing events from the given starting point,
# and be updated with new events as they become available
logger.info(f"Streaming events from ID {start_after_event_id}")
with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
self.send_response(200)
self.send_header("Content-type", "application/yaml; charset=utf-8")
self.send_header("Connection", "close")
self.end_headers()
# If the server is shutting down, all ongoing streams should terminate
while not self._stop_streams_event.is_set():
try:
# Get the next event;
# if the queue is empty, will block until an event is added, up to a maximum of 1s
try:
data = events_queue.get(timeout=1)
except queue.Empty:
# Send an empty line to keep the HTTP connection from timing out
self.wfile.write(b"\n")
continue
# Send the encoded event plus the seperator line
self.wfile.write(json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n")
except BrokenPipeError as ex:
# TODO: This does not reliably detect loss of connection
# Broken pipe means the connection was lost,
# either because the client closed it or there was a network error
logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
return
def serve():
http.server.ThreadingHTTPServer(("", 8090), RemoteHTTPHandler).serve_forever()
I wrote this under the expectation that, once the client closed the connection, calling self.wfile.write()
would raise BrokenPipeError (possibly after some delay for the TCP connection to time out). However, this does not happen; the server will continue to happily write events to a connection no one is listening to without any error for at least 20 minutes.
What is the correct way to check if the client is still there and listening?
Edit:
Per @Saxtheowl's suggested solution, I tried setting the socket timeout and using select()
to check if the socket was writable:
import http.server
import json
import queue
import select
import socket
from common.log import log
logger = log.get_logger()
class RemoteHTTPHandler(http.server.BaseHTTPRequestHandler):
...
def __stream_events(self, start_after_event_id: int) -> None:
"""Stream events starting after the given ID and continuing as new events become available"""
# Get a queue of events, which will include all existing events from the given starting point,
# and be updated with new events as they become available
logger.info(f"Streaming events from ID {start_after_event_id}")
with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
# Send the headers
self.send_response(200)
self.send_header("Content-type", "application/yaml; charset=utf-8")
self.send_header("Connection", "close")
self.end_headers()
# Set a timeout on the underlying socket
self.connection.settimeout(2)
# If the server is shutting down, all ongoing streams should terminate
while not self._stop_streams_event.is_set():
try:
# Get the next event;
# if the queue is empty, will block until an event is added, up to a maximum of 1s
message: bytes
try:
data = events_queue.get(timeout=1)
except queue.Empty:
# Send an empty line to keep the HTTP connection from timing out
logger.debug(f"Sending blank line") # FIXME: For test
message = b"\n"
else:
# Send the encoded event plus the seperator line
logger.debug(f"Sending event") # FIXME: For test
message = json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n"
# Check if the client is still connected using select
write_ready_fds: list[socket.socket]
logger.debug(f"Connection check") # FIXME: For test
__, write_ready_fds, __ = select.select([], [self.connection], [], 3)
if not write_ready_fds:
logger.info(f"Connection closed")
return
logger.debug(f"Still connected; sending") # FIXME: For test
self.wfile.write(message)
except (BrokenPipeError, TimeoutError) as ex:
# Broken pipe or socket timeout means the connection was lost,
# either because the client closed it or there was a network error
logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
return
except BaseException as ex:
# Unexpected error
logger.exception(f"Error sending events: {type(ex).__name__}: {ex}")
raise
However, this did not help; even several minutes after the client disconnected, no timeout errors were raised and select()
still returned immediately claiming the socket was writable.
Upvotes: 2
Views: 267