GP89
GP89

Reputation: 6730

Streaming HTTP body over FTP with twisted

I have a custom FTP server which talks to an api for folder listings etc, and the files come back as urls from the api. I'm trying to open a http stream to these urls and feed the data back through ftp the client (in a non blocking way) but I can't figure out how to hook it up.

I've tried to put together a minimal example to better explain my issue. In the example it starts a local FTP server on port 2121 which lists the local file system, but when downloading a file it returns the content body of www.yahoo.com instead of the file data.

I tried buffering the data through a io.BytesIO object, but no data is sent across. I'm wondering if this is the right approach or if it's because the read pointer is always at the end of the file object perhaps?

Here's the example code:

import io
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell, _FileReader
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess, FilePasswordDB
from twisted.internet import defer

agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])

class StreamWriter(Protocol):
    def __init__(self, finished, stream):
        self.finished = finished
        self.stream = stream

    def dataReceived(self, bytes):
        self.stream.write(bytes)

    def connectionLost(self, reason):
        print 'Finished receiving body:', reason.type, reason.value
        self.finished.callback(None)


def streamBody(response, stream):
    finished = Deferred()
    response.deliverBody(StreamWriter(finished, stream))
    return finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")

    stream = io.BytesIO()
    d.addCallback(lambda resp: streamBody(resp, stream))
    d.addErrback(log.err)

    return defer.succeed(_FileReader(stream))

def main():

    FTPAnonymousShell.openForReading = openForReading

    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])

    f = FTPFactory(p)

    reactor.listenTCP(2121, f)
    reactor.run()

if __name__ == "__main__":
    main()

Edit

class FinishNotifier(ProtocolToConsumerAdapter, Protocol):
    def __init__(self, original):
        ProtocolToConsumerAdapter.__init__(self, original)
        self.finished = defer.Deferred()

    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        self.finished.callback(None)

class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response

    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")

    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)

    return d

updated runnable example:

from twisted.python import log
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ConsumerToProtocolAdapter, connectionDone
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder, HTTPConnectionPool, HTTPClientFactory
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess
from twisted.internet import defer
from twisted.internet.error import ConnectionDone
from twisted.web._newclient import ResponseDone

agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
# pool= HTTPConnectionPool(reactor,persistent=True)
# pool.maxPersistentPerHost = 2
# agent= Agent(reactor,pool=pool,connectTimeout=5)

class FinishNotifier(ConsumerToProtocolAdapter):
    def __init__(self, original):
        ConsumerToProtocolAdapter.__init__(self, original)
        self.finished = defer.Deferred()

    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        print "finished"
        self.finished.callback(None)

class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response

    def send(self, consumer):
        print consumer
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.testtest.com")
    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)

    return d

def main():

    FTPAnonymousShell.openForReading = openForReading

    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])

    f = FTPFactory(p)

    reactor.listenTCP(2121, f)
    reactor.run()

if __name__ == "__main__":
    main()

Upvotes: 0

Views: 788

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48335

if it's because the read pointer is always at the end of the file object perhaps?

Probably this. You have two things going on simultaneously. The HTTP client is writing to the BytesIO instance and the FTP client is reading from it. _FileReader (a private API, an implementation detail of Twisted's FTP library, not something you should actually be using) is for reading from a file that's complete already - not a file that's growing as it's being read from.

Fortunately there's no need to go through the asynchronous-unfriendly file interface. Look at the type that openForReading is supposed to return - an IReadFile provider. IReadFile has one method, send, which accepts an object that provides IConsumer.

On the other side, you have deliverBody which accepts an IProtocol. This protocol has the HTTP response body delivered to it. This is the data you want to give to the IConsumer passed to IReadFile.send.

So instead of trying to make these two pieces work together with a BytesIO, make them work together using the two interfaces involved: IProtocol and IConsumer. Here's a sketch (plenty of bugs in it but the general shape is correct):

from twisted.internet.protocol import ConsumerToProtocolAdapter
from twisted.internet.interfaces import IPushProducer
from twisted.protocols.ftp import IReadFile

class FinishNotifier(ConsumerToProtocolAdapter):
    def connectionLost(self, reason):
        reason.trap(ConnectionDone)
        self.finished.callback(None)

@implementer(IReadFile, IPushProducer)
class HTTP2FTP(object):
    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        response.deliverBody(protocol)
        # Lazy hack.
        # This code probably belongs in protocol.connectionMade instead.
        self._producer = protocol.transport
        consumer.registerProducer(self._producer, streaming=True)
        protocol.finished.addCallback(
            lambda ignored: consumer.unregisterProducer()
        )
        return protocol.finished

    def pauseProducing(self):
        self._producer.pauseProducing()

    def resumeProducing(self):
        self._producer.resumeProducing()

    def stopProducing(self):
        self._producer.stopProducing()

Note that by implementing IPushProducer here, we get flow control between the HTTP and FTP connections (so that memory usage on the server will be bounded, even if the HTTP connection transfers data much more quickly than the FTP connection). That's a pretty cool thing and it's nice that it's only a handful of extra lines to implement. The slightly less cool thing is that you must make the unregisterProducer call at the right time. The FTP protocol implementation uses this as indication that the data has been completely transferred. This is probably not quite documented sufficiently in Twisted (and that's an oversight that should be corrected).

Upvotes: 1

Related Questions