macdjord
macdjord

Reputation: 555

How to detect client disconnection when servicing a streaming endpoint with http.server

We have a simple web server written in Python's http.server. One of its endpoints serves streaming data - a series of events, with new events sent as they occur. We've successfully set up this endpoint so that it delivers the data as expected. However, if the client disconnects, the server does not detect this, and will continue delivering data down a connection no one's listening to indefinitely.

The relevant bit of code:

import json
import queue
import http.server

from common.log import log

logger = log.get_logger()

class RemoteHTTPHandler(http.server.BaseHTTPRequestHandler):

    ...

    def __stream_events(self, start_after_event_id: int) -> None:
        """Stream events starting after the given ID and continuing as new events become available"""

        # Get a queue of events, which will include all existing events from the given starting point,
        # and be updated with new events as they become available
        logger.info(f"Streaming events from ID {start_after_event_id}")
        with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
            self.send_response(200)
            self.send_header("Content-type", "application/yaml; charset=utf-8")
            self.send_header("Connection", "close")
            self.end_headers()

            # If the server is shutting down, all ongoing streams should terminate
            while not self._stop_streams_event.is_set():
                try:
                    # Get the next event;
                    # if the queue is empty, will block until an event is added, up to a maximum of 1s
                    try:
                        data = events_queue.get(timeout=1)
                    except queue.Empty:
                        # Send an empty line to keep the HTTP connection from timing out
                        self.wfile.write(b"\n")
                        continue

                    # Send the encoded event plus the seperator line
                    self.wfile.write(json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n")
                except BrokenPipeError as ex:
                    # TODO: This does not reliably detect loss of connection
                    # Broken pipe means the connection was lost,
                    # either because the client closed it or there was a network error
                    logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
                    return


def serve():
    http.server.ThreadingHTTPServer(("", 8090), RemoteHTTPHandler).serve_forever()

I wrote this under the expectation that, once the client closed the connection, calling self.wfile.write() would raise BrokenPipeError (possibly after some delay for the TCP connection to time out). However, this does not happen; the server will continue to happily write events to a connection no one is listening to without any error for at least 20 minutes.

What is the correct way to check if the client is still there and listening?

Edit: Per @Saxtheowl's suggested solution, I tried setting the socket timeout and using select() to check if the socket was writable:

import http.server
import json
import queue
import select
import socket

from common.log import log

logger = log.get_logger()


class RemoteHTTPHandler(http.server.BaseHTTPRequestHandler):

    ...

    def __stream_events(self, start_after_event_id: int) -> None:
        """Stream events starting after the given ID and continuing as new events become available"""

        # Get a queue of events, which will include all existing events from the given starting point,
        # and be updated with new events as they become available
        logger.info(f"Streaming events from ID {start_after_event_id}")
        with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
            # Send the headers
            self.send_response(200)
            self.send_header("Content-type", "application/yaml; charset=utf-8")
            self.send_header("Connection", "close")
            self.end_headers()

            # Set a timeout on the underlying socket
            self.connection.settimeout(2)

            # If the server is shutting down, all ongoing streams should terminate
            while not self._stop_streams_event.is_set():
                try:
                    # Get the next event;
                    # if the queue is empty, will block until an event is added, up to a maximum of 1s
                    message: bytes
                    try:
                        data = events_queue.get(timeout=1)
                    except queue.Empty:
                        # Send an empty line to keep the HTTP connection from timing out
                        logger.debug(f"Sending blank line")  # FIXME: For test
                        message = b"\n"
                    else:
                        # Send the encoded event plus the seperator line
                        logger.debug(f"Sending event")  # FIXME: For test
                        message = json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n"

                    # Check if the client is still connected using select
                    write_ready_fds: list[socket.socket]
                    logger.debug(f"Connection check")  # FIXME: For test
                    __, write_ready_fds, __ = select.select([], [self.connection], [], 3)
                    if not write_ready_fds:
                        logger.info(f"Connection closed")
                        return

                    logger.debug(f"Still connected; sending")  # FIXME: For test
                    self.wfile.write(message)
                except (BrokenPipeError, TimeoutError) as ex:
                    # Broken pipe or socket timeout means the connection was lost,
                    # either because the client closed it or there was a network error
                    logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
                    return
                except BaseException as ex:
                    # Unexpected error
                    logger.exception(f"Error sending events: {type(ex).__name__}: {ex}")
                    raise

However, this did not help; even several minutes after the client disconnected, no timeout errors were raised and select() still returned immediately claiming the socket was writable.

Upvotes: 2

Views: 267

Answers (0)

Related Questions