jokoon
jokoon

Reputation: 6633

How do I abort a socket.recvfrom() from another thread in python?

This looks like a duplicate of How do I abort a socket.recv() from another thread in Python, but it's not, since I want to abort recvfrom() in a thread, which is UDP, not TCP.

Can this be solved by poll() or select.select() ?

Upvotes: 7

Views: 3725

Answers (5)

mo FEAR
mo FEAR

Reputation: 693

To properly close a tcp socket in python, you have to call socket.shutdown(arg) before calling socket.close(). See the python socket documentation, the part about shutdown.

If the socket is UDP, you can't call socket.shutdown(...), it would raise an exception. And calling socket.close() alone would, like for tcp, keep the blocked operations blocking. close() alone won't interrupt them.

Many suggested solutions (not all), don't work or are seen as cumbersome as they involve 3rd party libraries. I haven't tested poll() or select(). What does definately work, is the following:

firstly, create an official Thread object for whatever thread is running socket.recv(), and save the handle to it. Secondly, import signal. Signal is an official library, which enables sending/recieving linux/posix signals to processes (read its documentation). Thirdly, to interrupt, assuming that handle to your thread is called udpThreadHandle:

signal.pthread_kill(udpthreadHandle.ident, signal.SIGINT)

and ofcourse, in the actual thread/loop doing the recieving:

try:
    while True:
       myUdpSocket.recv(...)
except KeyboardInterrupt:
    pass

Notice, the exception handler for KeyboardInterrupt (generated by SIGINT), is OUTSIDE the recieve loop. This silently terminates the recieve loop and its thread.

Upvotes: 0

James Card
James Card

Reputation: 11

The solution here is to forcibly close the socket. The problem is that the method for doing this is OS-specific and Python does not do a good job of abstracting the way to do it or the consequences. Basically, you need to do a shutdown() followed by a close() on the socket. On POSIX systems such as Linux, the shutdown is the key element in forcing recvfrom to stop (a call to close() alone won't do it). On Windows, shutdown() does not affect the recvfrom and the close() is the key element. This is exactly the behavior that you would see if you were implementing this code in C and using either native POSIX sockets or Winsock sockets, so Python is providing a very thin layer on top of those calls.

On both POSIX and Windows systems, this sequence of calls results in an OSError being raised. However, the location of the exception and the details of it are OS-specific. On POSIX systems, the exception is raised on the call to shutdown() and the errno value of the exception is set to 107 (Transport endpoint is not connected). On Windows systems, the exception is raised on the call to recvfrom() and the winerror value of the exception is set to 10038 (An operation was attempted on something that is not a socket). This means that there's no way to do this in an OS-agnositc way, the code has to account for both Windows and POSIX behavior and errors. Here's a simple example I wrote up:

import socket
import threading
import time

class MyServer(object):
    def __init__(self, port:int=0):
        if port == 0:
            raise AttributeError('Invalid port supplied.')

        self.port = port
        self.socket = socket.socket(family=socket.AF_INET,
                type=socket.SOCK_DGRAM)
        self.socket.bind(('0.0.0.0', port))

        self.exit_now = False

        print('Starting server.')
        self.thread = threading.Thread(target=self.run_server,
                args=[self.socket])
        self.thread.start()

    def run_server(self, socket:socket.socket=None):
        if socket is None:
            raise AttributeError('No socket provided.')

        buffer_size = 4096

        while self.exit_now == False:
            data = b''
            try:
                data, address = socket.recvfrom(buffer_size)
            except OSError as e:
                if e.winerror == 10038:
                    # Error is, "An operation was attempted on something that
                    # is not a socket".  We don't care.
                    pass
                else:
                    raise e
            if len(data) > 0:
                print(f'Received {len(data)} bytes from {address}.')

    def stop(self):
        self.exit_now = True
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except OSError as e:
            if e.errno == 107:
                # Error is, "Transport endpoint is not connected".
                # We don't care.
                pass
            else:
                raise e
        self.socket.close()
        self.thread.join()
        print('Server stopped.')


if __name__ == '__main__':
    server = MyServer(5555)
    time.sleep(2)
    server.stop()
    exit(0)

Upvotes: 1

Michele d'Amico
Michele d'Amico

Reputation: 23741

A good way to handle this kind of asynchronous interruption is the old C pipe trick. You can create a pipe and use select/poll on both socket and pipe: Now when you want interrupt receiver you can just send a char to the pipe.

  • pros:
    • Can work both for UDP and TCP
    • Is protocol agnostic
  • cons:
    • select/poll on pipes are not available on Windows, in this case you should replace it by another UDP socket that use as notification pipe

Starting point

interruptable_socket.py

import os
import socket
import select


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._r_pipe, self._w_pipe = os.pipe()
        self._interrupted = False

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket])
        if self._socket in read:
            return self._socket.recv(buffersize, flags)
        return ""

    def interrupt(self):
        self._interrupted = True
        os.write(self._w_pipe, "I".encode())

A test suite:

test_interruptable_socket.py

import socket
from threading import Timer
import time
from interruptable_socket import InterruptableUdpSocketReceiver
import unittest


class Sender(object):
    def __init__(self, destination_host, destination_port):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self._dest = (destination_host, destination_port)

    def send(self, message):
        self._socket.sendto(message, self._dest)

class Test(unittest.TestCase):
    def create_receiver(self, host="127.0.0.1", port=3010):
        receiver = InterruptableUdpSocketReceiver(host, port)
        receiver.bind()
        return receiver

    def create_sender(self, host="127.0.0.1", port=3010):
        return Sender(host, port)

    def create_sender_receiver(self, host="127.0.0.1", port=3010):
        return self.create_sender(host, port), self.create_receiver(host, port)

    def test_create(self):
        self.create_receiver()

    def test_recv_async(self):
        sender, receiver = self.create_sender_receiver()
        start = time.time()
        send_message = "TEST".encode('UTF-8')
        Timer(0.1, sender.send, (send_message, )).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(message, send_message)

    def test_interrupt_async(self):
        receiver = self.create_receiver()
        start = time.time()
        Timer(0.1, receiver.interrupt).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(0, len(message))

    def test_exception_after_interrupt(self):
        sender, receiver = self.create_sender_receiver()
        receiver.interrupt()
        with self.assertRaises(RuntimeError):
            receiver.recv(128)


if __name__ == '__main__':
    unittest.main()

Evolution

Now this code is just a starting point. To make it more generic I see we should fix follow issues:

  1. Interface: return empty message in interrupt case is not a good deal, is better to use an exception to handle it
  2. Generalization: we should have just a function to call before socket.recv(), extend interrupt to others recv methods become very simple
  3. Portability: to make simple port it to windows we should isolate the async notification in a object to choose the right implementation for our operating system

First of all we change test_interrupt_async() to check exception instead empty message:

from interruptable_socket import InterruptException

def test_interrupt_async(self):
    receiver = self.create_receiver()
    start = time.time()
    with self.assertRaises(InterruptException):
        Timer(0.1, receiver.interrupt).start()
        receiver.recv(128)
    elapsed = time.time()-start
    self.assertGreaterEqual(elapsed, 0.095)
    self.assertLess(elapsed, 0.11)

After this we can replace return '' by raise InterruptException and the tests pass again.

The ready to extend version can be :

interruptable_socket.py

import os
import socket
import select


class InterruptException(Exception):
    pass


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._async_interrupt = AsycInterrupt(self._socket)

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        self._async_interrupt.wait_for_receive()
        return self._socket.recv(buffersize, flags)

    def interrupt(self):
        self._async_interrupt.interrupt()


class AsycInterrupt(object):
    def __init__(self, descriptor):
        self._read, self._write = os.pipe()
        self._interrupted = False
        self._descriptor = descriptor

    def interrupt(self):
        self._interrupted = True
        self._notify()

    def wait_for_receive(self):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor])
        if self._descriptor not in read:
            raise InterruptException

    def _notify(self):
        os.write(self._write, "I".encode())

Now wraps more recv function, implement a windows version or take care of socket timeouts become really simple.

Upvotes: 3

Gringo Suave
Gringo Suave

Reputation: 31900

Implement a quit command on the server and client sockets. Should work something like this:

Thread1: 
    status: listening
    handler: quit

Thread2: client
    exec: socket.send "quit"  ---> Thread1.socket @ host:port

Thread1: 
    status: socket closed()

Upvotes: 0

Martin James
Martin James

Reputation: 24867

If you want to unblock a UDP read from another thread, send it a datagram!

Rgds, Martin

Upvotes: 6

Related Questions