Yann Bouteiller
Yann Bouteiller

Reputation: 77

Python Twisted: How to make the reactor poll a Pipe along with internet sockets?

I would like to use Twisted as a client/server manager that is part of regular Python objects. The solution I am trying to implement is to isolate Twisted in its own process using multiprocessing.Process, and communicate with this process through multiprocessing.Pipe.

I have coded the client/server logic with Twisted already, but now I am stuck at interfacing the multiprocessing.Pipe communication with the reactor.

I am a beginner with Twisted so I may be missing something obvious, but from what I understand about how reactors work, I guess the reactor is somehow supposed to poll from my multiprocessing.Pipe along with the sockets that it already seems to handle nicely. So my question is, how can I make the reactor listen to my multiprocessing.Pipe on top of what it is already doing please?

Thus far my code looks something like this:

class ServerProtocol(Protocol):

    def __init__(self, server):
        self._server = server

    def connectionMade(self):
        pass

    def connectionLost(self, reason):
        pass

    def dataReceived(self, data):
        pass


class ServerProtocolFactory(Factory):

    protocol = ServerProtocol

    def __init__(self, server):
        self.server = server

    def buildProtocol(self, addr):
        return ServerProtocol(self.server)


class Server:
    def __init__(self):
        pass

    def run(self, pipe):
        """
        This is called in its own process
        """
        from twisted.internet import reactor
        endpoint = TCP4ServerEndpoint(reactor, self._port)
        endpoint.listen(ServerProtocolFactory(self))
        reactor.run()  # main Twisted reactor loop


class MyObject:
    def __init__(self):
        self._pipe = Pipe()
        self._server = Server()
        self._p = Process(target=self._server.run, args=(self._pipe, ))
        self._p.start()

    def stop(self):
        # I want to send some stop command through the Pipe here
        self._p.join()


if __name__ == "__main__":
    obj = MyObject()
    # do stuff here
    obj.stop()

Upvotes: 0

Views: 157

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48345

I don't know if Twisted will work as run this way (i.e., as the target of a multiprocessing.Process). Let's assume it will, though.

multiprocessing.Pipe is documented as returning a two-tuple of multiprocessing.Connection objects. multiprocessing.Connection is documented as having a fileno method returning a file descriptor (or handle) used by the Connection.

If it is a file descriptor then there is probably a very easy path to integrating it with a Twisted reactor. Most Twisted reactors implement IReactorFDSet which has an addReader method which accepts an IReadDescriptor value.

Connection is not quite an IReadDescriptor but it is easily adapted to be one:

from attrs import define
from multiprocessing import Connection
from twisted.python.failure import Failure

@define
class ConnectionToDescriptor:
    _conn: Connection

    def fileno(self) -> int:
        return self._conn.fileno()

    def doRead(self) -> None:
        some_data = self._conn.recv()
        # Process some_data how you like

    def connectionLost(self, reason: Failure) -> None:
        self._conn.close()

If you wrap this around your read Connection and then pass the result to reactor.addReader the reactor will use fileno to figure out what to monitor for readiness and call doRead when there is something to read.

You could apply similar treatment to the write end of the pipe if you also want reactor-friendly support for sending bytes back to the parent process.

Upvotes: 1

Related Questions