jdt141
jdt141

Reputation: 5143

How to properly trigger a python twisted transport?

I have been asked to write a class that connects to a server, asynchronously sends the server various commands, and then provides the returned data to the client. I've been asked to do this in Python, which is a new language to me. I started digging around and found the Twisted framework which offers some very nice abstractions (Protocol, ProtocolFactory, Reactor) that do a lot of the things that I would have to do if I would roll my own socket-based app. It seems like the right choice given the problem that I have to solve.

I've looked through numerous examples on the web (mostly Krondo), but I still haven't seen a good example of creating a client that will send multiple commands across the wire and I maintain the connection I create. The server (of which I have no control over), in this case, doesn't disconnect after it sends the response. So, what's the proper way to design the client so that I can tickle the server in various ways?

Right now I do this:

class TestProtocol(Protocol)
    def connectionMade(self):
         self.transport.write(self.factory.message)

class TestProtocolFactory(Factory):
    message = ''
    def setMessage(self, msg):
        self.message = msg

def main():
    f = TestProtocolFactory()
    f.setMessage("my message")
    reactor.connectTCP(...)
    reactor.run()

What I really want to do is call self.transport.write(...) via the reactor (really, call TestProtocolFactory::setMessage() on-demand from another thread of execution), not just when the connection is made.

Upvotes: 3

Views: 3114

Answers (3)

Houcheng
Houcheng

Reputation: 2884

The twisted framework is event-based programming; and by nature, its method is all called in async, and result is get by defer object.

The framework's nature is approprivate for protocol developing, just you have to change your minding from traditional sequential programming. The Protocol class is like a finite state machine with events like: connection make, connection lost, receive data. You can convert your client code into FSM and then will be easily to fit into the Protocol class.

Below is an rough example of what I want to express. A bit of rouge, but this is i can provide now:

class SyncTransport(Protocol):
    # protocol
    def dataReceived(self, data):
        print 'receive data', data
    def connectionMade(self):
        print 'i made a sync connection, wow'
        self.transport.write('x')
        self.state = I_AM_LIVING
    def connectionLost(self):
        print 'i lost my sync connection, sight'
    def send(self, data):
        if self.state == I_AM_LIVING:
            if data == 'x':
              self.transport.write('y')
           if data == 'Y':
              self.transport.write('z')
              self.state = WAITING_DEAD
        if self.state == WAITING_DEAD:
              self.transport.close()

Upvotes: 0

the paul
the paul

Reputation: 9161

You may want to use a Service.

Services are pieces of functionality within a Twisted app which are started and stopped, and are nice abstractions for other parts of your code to interact with. For example, in this case you might have a SayStuffToServerService (I know, terrible name, but without knowing more about its job it was the best I could do here :) ) that exposed something like this:

class SayStuffToServerService:
    def __init__(self, host, port):
        # this is the host and port to connect to

    def sendToServer(self, whatToSend):
        # send some line to the remote server

    def startService(self):
        # call me before using the service. starts outgoing connection efforts.

    def stopService(self):
        # clean reactor shutdowns should call this method. stops outgoing
        # connection efforts.

(That might be all the interface you need, but it should be fairly clear where you can add things to this.)

The startService() and stopService() methods here are just what Twisted's Services expose. And helpfully, there is a premade Twisted Service which acts like a TCP client and takes care of all the reactor stuff for you. It's twisted.application.internet.TCPClient, which takes arguments for a remote host and port, along with a ProtocolFactory to take care of handling the actual connection attempt.

Here is the SayStuffToServerService, implemented as a subclass of TCPClient:

from twisted.application import internet

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        # we'll do stuff here

(See below for the SayStuffToServerProtocolFactory.)

Using this Service architecture is convenient in a lot of ways; you can group Services together in one container, so that they all get stopped and started as one when you have different parts of your app that you want active. It may make good sense to implement other parts of your app as separate Services. You can set Services as child services to application- the magic name that twistd looks for in order to know how to initialize, daemonize, and shut down your app. Actually yes, let's add some code to do that now.

from twisted.application import service

...

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

That's all. Now when you run this module under twistd (i.e., for debugging, twistd -noy saystuff.py), that application will be started under the right reactor, and it will in turn start the SayStuffToServerService, which will start a connection effort to localhost:65432, which will use the service's factory attribute to set up the connection and the Protocol. You don't need to call reactor.run() or attach things to the reactor yourself anymore.

So we haven't implemented SayStuffToServerProtocolFactory yet. Since it sounds like you would prefer that your client reconnect if it has lost the connection (so that callers of sendToServer can usually just assume that there's a working connection), I'm going to put this protocol factory on top of ReconnectingClientFactory.

from twisted.internet import protocol

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

This is a pretty nice minimal definition, which will keep trying to make outgoing TCP connections to the host and port we specified, and instantiate a SayStuffToServerProtocol each time. When we fail to connect, this class will do nice, well-behaved exponential backoff so that your network doesn't get hammered (you can set a maximum wait time). It will be the responsibility of the Protocol to assign to _my_live_proto and call this factory's resetDelay() method, so that exponential backoff will continue to work as expected. And here is that Protocol now:

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

This is implemented on top of twisted.protocols.basic.LineReceiver, but would work as well with any other sort of Protocol, in case your protocol isn't line-oriented.

The only thing left is hooking up the Service to the right Protocol instance. This is why the Factory keeps a _my_live_proto attribute, which should be set when a connection is successfully made, and cleared (set to None) when that connection is lost. Here's the new implementation of SayStuffToServerService.sendToServer:

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):

    ...

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

And now to tie it all together in one place:

from twisted.application import internet, service
from twisted.internet import protocol
from twisted.protocols import basic

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

Hopefully that gives enough of a framework with which to start. There is sometimes a lot of plumbing to do to handle client disconnections just the way you want, or to handle out-of-order responses from the server, or handle various sorts of timeout, canceling pending requests, allowing multiple pooled connections, etc, etc, but this should help.

Upvotes: 4

porgarmingduod
porgarmingduod

Reputation: 7878

Depends. Here are some possibilities:

I'm assuming

Approach 1. You have a list of commands to send the server, and for some reason can't do them all at once. In that case send a new one as the previous answer returns:

class proto(parentProtocol):
    def stringReceived(self, data):
        self.handle_server_response(data)
        next_command = self.command_queue.pop()
        # do stuff

Approach 2. What you send to the server is based on what the server sends you:

class proto(parentProtocol):
    def stringReceived(self, data):
        if data == "this":
            self.sendString("that")
        elif data == "foo":
            self.sendString("bar")
        # and so on

Approach 3. You don't care what the server sends to, you just want to periodically send some commands:

class proto(parentProtocol):
    def callback(self):
        next_command = self.command_queue.pop()
        # do stuff
    def connectionMade(self):
        from twisted.internet import task
        self.task_id = task.LoopingCall(self.callback)
        self.task_id.start(1.0)

Approach 4: Your edit now mentions triggering from another thread. Feel free to check the twisted documentation to find out if proto.sendString is threadsafe. You may be able to call it directly, but I don't know. Approach 3 is threadsafe though. Just fill the queue (which is threadsafe) from another thread.

Basically you can store any amount of state in your protocol; it will stay around until you are done. The you either send commands to the server as a response to it's messages to you, or you set up some scheduling to do your stuff. Or both.

Upvotes: 4

Related Questions