Reputation: 6730
I have a custom FTP server which talks to an api for folder listings etc, and the files come back as urls from the api. I'm trying to open a http stream to these urls and feed the data back through ftp the client (in a non blocking way) but I can't figure out how to hook it up.
I've tried to put together a minimal example to better explain my issue. In the example it starts a local FTP server on port 2121 which lists the local file system, but when downloading a file it returns the content body of www.yahoo.com instead of the file data.
I tried buffering the data through a io.BytesIO
object, but no data is sent across. I'm wondering if this is the right approach or if it's because the read pointer is always at the end of the file object perhaps?
Here's the example code:
import io
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell, _FileReader
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess, FilePasswordDB
from twisted.internet import defer
agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
class StreamWriter(Protocol):
def __init__(self, finished, stream):
self.finished = finished
self.stream = stream
def dataReceived(self, bytes):
self.stream.write(bytes)
def connectionLost(self, reason):
print 'Finished receiving body:', reason.type, reason.value
self.finished.callback(None)
def streamBody(response, stream):
finished = Deferred()
response.deliverBody(StreamWriter(finished, stream))
return finished
def openForReading(self, path):
d = agent.request("GET", "http://www.yahoo.com")
stream = io.BytesIO()
d.addCallback(lambda resp: streamBody(resp, stream))
d.addErrback(log.err)
return defer.succeed(_FileReader(stream))
def main():
FTPAnonymousShell.openForReading = openForReading
p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])
f = FTPFactory(p)
reactor.listenTCP(2121, f)
reactor.run()
if __name__ == "__main__":
main()
Edit
class FinishNotifier(ProtocolToConsumerAdapter, Protocol):
def __init__(self, original):
ProtocolToConsumerAdapter.__init__(self, original)
self.finished = defer.Deferred()
def connectionLost(self, reason=connectionDone):
reason.trap(ConnectionDone, ResponseDone)
self.finished.callback(None)
class HTTP2FTP(object):
def __init__(self, response):
self.response = response
def send(self, consumer):
protocol = FinishNotifier(consumer)
self.response.deliverBody(protocol)
return protocol.finished
def openForReading(self, path):
d = agent.request("GET", "http://www.yahoo.com")
d.addCallback(HTTP2FTP)
d.addErrback(log.err)
return d
updated runnable example:
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ConsumerToProtocolAdapter, connectionDone
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder, HTTPConnectionPool, HTTPClientFactory
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess
from twisted.internet import defer
from twisted.internet.error import ConnectionDone
from twisted.web._newclient import ResponseDone
agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
# pool= HTTPConnectionPool(reactor,persistent=True)
# pool.maxPersistentPerHost = 2
# agent= Agent(reactor,pool=pool,connectTimeout=5)
class FinishNotifier(ConsumerToProtocolAdapter):
def __init__(self, original):
ConsumerToProtocolAdapter.__init__(self, original)
self.finished = defer.Deferred()
def connectionLost(self, reason=connectionDone):
reason.trap(ConnectionDone, ResponseDone)
print "finished"
self.finished.callback(None)
class HTTP2FTP(object):
def __init__(self, response):
self.response = response
def send(self, consumer):
print consumer
protocol = FinishNotifier(consumer)
self.response.deliverBody(protocol)
return protocol.finished
def openForReading(self, path):
d = agent.request("GET", "http://www.testtest.com")
d.addCallback(HTTP2FTP)
d.addErrback(log.err)
return d
def main():
FTPAnonymousShell.openForReading = openForReading
p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])
f = FTPFactory(p)
reactor.listenTCP(2121, f)
reactor.run()
if __name__ == "__main__":
main()
Upvotes: 0
Views: 788
Reputation: 48335
if it's because the read pointer is always at the end of the file object perhaps?
Probably this. You have two things going on simultaneously. The HTTP client is writing to the BytesIO
instance and the FTP client is reading from it. _FileReader
(a private API, an implementation detail of Twisted's FTP library, not something you should actually be using) is for reading from a file that's complete already - not a file that's growing as it's being read from.
Fortunately there's no need to go through the asynchronous-unfriendly file
interface. Look at the type that openForReading
is supposed to return - an IReadFile
provider. IReadFile
has one method, send
, which accepts an object that provides IConsumer
.
On the other side, you have deliverBody
which accepts an IProtocol
. This protocol has the HTTP response body delivered to it. This is the data you want to give to the IConsumer
passed to IReadFile.send
.
So instead of trying to make these two pieces work together with a BytesIO
, make them work together using the two interfaces involved: IProtocol
and IConsumer
. Here's a sketch (plenty of bugs in it but the general shape is correct):
from twisted.internet.protocol import ConsumerToProtocolAdapter
from twisted.internet.interfaces import IPushProducer
from twisted.protocols.ftp import IReadFile
class FinishNotifier(ConsumerToProtocolAdapter):
def connectionLost(self, reason):
reason.trap(ConnectionDone)
self.finished.callback(None)
@implementer(IReadFile, IPushProducer)
class HTTP2FTP(object):
def send(self, consumer):
protocol = FinishNotifier(consumer)
response.deliverBody(protocol)
# Lazy hack.
# This code probably belongs in protocol.connectionMade instead.
self._producer = protocol.transport
consumer.registerProducer(self._producer, streaming=True)
protocol.finished.addCallback(
lambda ignored: consumer.unregisterProducer()
)
return protocol.finished
def pauseProducing(self):
self._producer.pauseProducing()
def resumeProducing(self):
self._producer.resumeProducing()
def stopProducing(self):
self._producer.stopProducing()
Note that by implementing IPushProducer
here, we get flow control between the HTTP and FTP connections (so that memory usage on the server will be bounded, even if the HTTP connection transfers data much more quickly than the FTP connection). That's a pretty cool thing and it's nice that it's only a handful of extra lines to implement. The slightly less cool thing is that you must make the unregisterProducer
call at the right time. The FTP protocol implementation uses this as indication that the data has been completely transferred. This is probably not quite documented sufficiently in Twisted (and that's an oversight that should be corrected).
Upvotes: 1