Reputation: 5143
I have been asked to write a class that connects to a server, asynchronously sends the server various commands, and then provides the returned data to the client. I've been asked to do this in Python, which is a new language to me. I started digging around and found the Twisted framework which offers some very nice abstractions (Protocol, ProtocolFactory, Reactor) that do a lot of the things that I would have to do if I would roll my own socket-based app. It seems like the right choice given the problem that I have to solve.
I've looked through numerous examples on the web (mostly Krondo), but I still haven't seen a good example of creating a client that will send multiple commands across the wire and I maintain the connection I create. The server (of which I have no control over), in this case, doesn't disconnect after it sends the response. So, what's the proper way to design the client so that I can tickle the server in various ways?
Right now I do this:
class TestProtocol(Protocol)
def connectionMade(self):
self.transport.write(self.factory.message)
class TestProtocolFactory(Factory):
message = ''
def setMessage(self, msg):
self.message = msg
def main():
f = TestProtocolFactory()
f.setMessage("my message")
reactor.connectTCP(...)
reactor.run()
What I really want to do is call self.transport.write(...)
via the reactor (really, call TestProtocolFactory::setMessage() on-demand from another thread of execution), not just when the connection is made.
Upvotes: 3
Views: 3114
Reputation: 2884
The twisted framework is event-based programming; and by nature, its method is all called in async, and result is get by defer object.
The framework's nature is approprivate for protocol developing, just you have to change your minding from traditional sequential programming. The Protocol class is like a finite state machine with events like: connection make, connection lost, receive data. You can convert your client code into FSM and then will be easily to fit into the Protocol class.
Below is an rough example of what I want to express. A bit of rouge, but this is i can provide now:
class SyncTransport(Protocol):
# protocol
def dataReceived(self, data):
print 'receive data', data
def connectionMade(self):
print 'i made a sync connection, wow'
self.transport.write('x')
self.state = I_AM_LIVING
def connectionLost(self):
print 'i lost my sync connection, sight'
def send(self, data):
if self.state == I_AM_LIVING:
if data == 'x':
self.transport.write('y')
if data == 'Y':
self.transport.write('z')
self.state = WAITING_DEAD
if self.state == WAITING_DEAD:
self.transport.close()
Upvotes: 0
Reputation: 9161
You may want to use a Service.
Services are pieces of functionality within a Twisted app which are started and stopped, and are nice abstractions for other parts of your code to interact with. For example, in this case you might have a SayStuffToServerService (I know, terrible name, but without knowing more about its job it was the best I could do here :) ) that exposed something like this:
class SayStuffToServerService:
def __init__(self, host, port):
# this is the host and port to connect to
def sendToServer(self, whatToSend):
# send some line to the remote server
def startService(self):
# call me before using the service. starts outgoing connection efforts.
def stopService(self):
# clean reactor shutdowns should call this method. stops outgoing
# connection efforts.
(That might be all the interface you need, but it should be fairly clear where you can add things to this.)
The startService()
and stopService()
methods here are just what Twisted's Services expose. And helpfully, there is a premade Twisted Service which acts like a TCP client and takes care of all the reactor stuff for you. It's twisted.application.internet.TCPClient
, which takes arguments for a remote host and port, along with a ProtocolFactory to take care of handling the actual connection attempt.
Here is the SayStuffToServerService, implemented as a subclass of TCPClient
:
from twisted.application import internet
class SayStuffToServerService(internet.TCPClient):
factoryclass = SayStuffToServerProtocolFactory
def __init__(self, host, port):
self.factory = self.factoryclass()
internet.TCPClient.__init__(self, host, port, self.factory)
def sendToServer(self, whatToSend):
# we'll do stuff here
(See below for the SayStuffToServerProtocolFactory.)
Using this Service architecture is convenient in a lot of ways; you can group Services together in one container, so that they all get stopped and started as one when you have different parts of your app that you want active. It may make good sense to implement other parts of your app as separate Services. You can set Services as child services to application
- the magic name that twistd
looks for in order to know how to initialize, daemonize, and shut down your app. Actually yes, let's add some code to do that now.
from twisted.application import service
...
application = service.Application('say-stuff')
sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))
That's all. Now when you run this module under twistd
(i.e., for debugging, twistd -noy saystuff.py
), that application
will be started under the right reactor, and it will in turn start the SayStuffToServerService, which will start a connection effort to localhost:65432, which will use the service's factory
attribute to set up the connection and the Protocol. You don't need to call reactor.run()
or attach things to the reactor yourself anymore.
So we haven't implemented SayStuffToServerProtocolFactory yet. Since it sounds like you would prefer that your client reconnect if it has lost the connection (so that callers of sendToServer
can usually just assume that there's a working connection), I'm going to put this protocol factory on top of ReconnectingClientFactory
.
from twisted.internet import protocol
class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
_my_live_proto = None
protocol = SayStuffToServerProtocol
This is a pretty nice minimal definition, which will keep trying to make outgoing TCP connections to the host and port we specified, and instantiate a SayStuffToServerProtocol each time. When we fail to connect, this class will do nice, well-behaved exponential backoff so that your network doesn't get hammered (you can set a maximum wait time). It will be the responsibility of the Protocol to assign to _my_live_proto
and call this factory's resetDelay()
method, so that exponential backoff will continue to work as expected. And here is that Protocol now:
class SayStuffToServerProtocol(basic.LineReceiver):
def connectionMade(self):
# if there are things you need to do on connecting to ensure the
# connection is "all right" (maybe authenticate?) then do that
# before calling:
self.factory.resetDelay()
self.factory._my_live_proto = self
def connectionLost(self, reason):
self.factory._my_live_proto = None
del self.factory
def sayStuff(self, stuff):
self.sendLine(stuff)
def lineReceived(self, line):
# do whatever you want to do with incoming lines. often it makes sense
# to have a queue of Deferreds on a protocol instance like this, and
# each incoming response gets sent to the next queued Deferred (which
# may have been pushed on the queue after sending some outgoing
# message in sayStuff(), or whatever).
pass
This is implemented on top of twisted.protocols.basic.LineReceiver
, but would work as well with any other sort of Protocol, in case your protocol isn't line-oriented.
The only thing left is hooking up the Service to the right Protocol instance. This is why the Factory keeps a _my_live_proto
attribute, which should be set when a connection is successfully made, and cleared (set to None) when that connection is lost. Here's the new implementation of SayStuffToServerService.sendToServer
:
class NotConnectedError(Exception):
pass
class SayStuffToServerService(internet.TCPClient):
...
def sendToServer(self, whatToSend):
if self.factory._my_live_proto is None:
# define here whatever behavior is appropriate when there is no
# current connection (in case the client can't connect or
# reconnect)
raise NotConnectedError
self.factory._my_live_proto.sayStuff(whatToSend)
And now to tie it all together in one place:
from twisted.application import internet, service
from twisted.internet import protocol
from twisted.protocols import basic
class SayStuffToServerProtocol(basic.LineReceiver):
def connectionMade(self):
# if there are things you need to do on connecting to ensure the
# connection is "all right" (maybe authenticate?) then do that
# before calling:
self.factory.resetDelay()
self.factory._my_live_proto = self
def connectionLost(self, reason):
self.factory._my_live_proto = None
del self.factory
def sayStuff(self, stuff):
self.sendLine(stuff)
def lineReceived(self, line):
# do whatever you want to do with incoming lines. often it makes sense
# to have a queue of Deferreds on a protocol instance like this, and
# each incoming response gets sent to the next queued Deferred (which
# may have been pushed on the queue after sending some outgoing
# message in sayStuff(), or whatever).
pass
class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
_my_live_proto = None
protocol = SayStuffToServerProtocol
class NotConnectedError(Exception):
pass
class SayStuffToServerService(internet.TCPClient):
factoryclass = SayStuffToServerProtocolFactory
def __init__(self, host, port):
self.factory = self.factoryclass()
internet.TCPClient.__init__(self, host, port, self.factory)
def sendToServer(self, whatToSend):
if self.factory._my_live_proto is None:
# define here whatever behavior is appropriate when there is no
# current connection (in case the client can't connect or
# reconnect)
raise NotConnectedError
self.factory._my_live_proto.sayStuff(whatToSend)
application = service.Application('say-stuff')
sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))
Hopefully that gives enough of a framework with which to start. There is sometimes a lot of plumbing to do to handle client disconnections just the way you want, or to handle out-of-order responses from the server, or handle various sorts of timeout, canceling pending requests, allowing multiple pooled connections, etc, etc, but this should help.
Upvotes: 4
Reputation: 7878
Depends. Here are some possibilities:
I'm assuming
Approach 1. You have a list of commands to send the server, and for some reason can't do them all at once. In that case send a new one as the previous answer returns:
class proto(parentProtocol):
def stringReceived(self, data):
self.handle_server_response(data)
next_command = self.command_queue.pop()
# do stuff
Approach 2. What you send to the server is based on what the server sends you:
class proto(parentProtocol):
def stringReceived(self, data):
if data == "this":
self.sendString("that")
elif data == "foo":
self.sendString("bar")
# and so on
Approach 3. You don't care what the server sends to, you just want to periodically send some commands:
class proto(parentProtocol):
def callback(self):
next_command = self.command_queue.pop()
# do stuff
def connectionMade(self):
from twisted.internet import task
self.task_id = task.LoopingCall(self.callback)
self.task_id.start(1.0)
Approach 4: Your edit now mentions triggering from another thread. Feel free to check the twisted documentation to find out if proto.sendString
is threadsafe. You may be able to call it directly, but I don't know. Approach 3 is threadsafe though. Just fill the queue (which is threadsafe) from another thread.
Basically you can store any amount of state in your protocol; it will stay around until you are done. The you either send commands to the server as a response to it's messages to you, or you set up some scheduling to do your stuff. Or both.
Upvotes: 4