Peter Gibson
Peter Gibson

Reputation: 19534

Pattern for reading from a transport in Twisted Python

In Twisted Python, data is written to a Protocol's transport, but received by overwriting the dataReceived method. Is there a pattern for reading from a transport? This would be helpful when implementing state using inlineCallbacks

For example:

class SomeProtocol(Protocol):
    @defer.inlineCallbacks
    def login(self):
        self.transport.write('login')
        resp = yield self.transport.read(5, timeout=1) # this doesn't exist
        if resp != 'user:':
            raise SomeException()
        self.transport.write('admin')
        resp = transport.read(9, timeout=1)
        if resp != 'password:':
            raise SomeException()
        self.transport.write('hunter2')
        # ... etc

Upvotes: 0

Views: 253

Answers (2)

Peter Gibson
Peter Gibson

Reputation: 19534

I ended up maintaining a list of deferreds to callback as data arrives, and buffering the incoming data until it satisfies the length of data required for the first deferred in the list.

class SomeProtocol(Protocol):

    # initialise self.buf and self.readers in __init__

    def deferred_read(self, count, timeout=None):
        """Return a deferred that fires when data becomes available"""
        d = defer.Deferred()
        reader = [d, count]
        timeout_cb = None
        if timeout is not None:
            timeout_cb = self.reactor.callLater(timeout, self.deferred_read_timeout, reader)
        reader.append(timeout_cb)
        self.readers.append(reader)
        self.check_readers()
        return d

    def deferred_read_timeout(self, reader):
        """Timeout this reader and check if others now match"""
        d, count, timeout_cb = reader
        self.readers.remove(reader)
        d.errback(TimeoutException()) # defined elsewhere
        self.check_readers()

    def check_readers(self):
        """Check if there is enough data to satisfy first reader"""
        try:
            while 1:
                reader = self.readers[0]
                d, count, timeout_cb = reader
                if len(self.buf) < count:
                    break
                data = self.buf[:count]
                self.buf = self.buf[count:]
                self.readers.remove(reader)
                try:
                    timeout_cb.cancel()
                except: pass
                d.callback(data)
        except IndexError: pass

    def dataReceived(self, data):
        self.buf += data
        self.check_readers()

It currently requires count to be non-zero. It would be good to extend it to support returning whatever is currently in the read buffer, and reading with a timeout but no count so that whatever is in the buffer after the timeout is returned.

Upvotes: 0

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48315

There have been a couple attempts at implementing APIs like this one over the years. None have gained any traction. I think they've all been abandoned at this point.

In principle, this isn't difficult to implement. You're just transforming the dataReceived callback - a push-style API - into a pull-style API.

In practice, the resulting code is fragile and tends to contain more bugs.

I think the problem you're trying to address is that dataReceived is a very low-level primitive for parsing a stream of bytes.

There are a number of possible solutions to this. You could try building a higher level protocol-based tool which knows about aspects of your protocol (this is basically what all of the protocol implementations in Twisted do). You could also look at such third-party libraries as tubes (which provides a different abstraction for dealing with byte streams).

Upvotes: 1

Related Questions