user1650177
user1650177

Reputation: 455

tornado: AsyncHttpClient.fetch from an iterator?

I'm trying to write a web crawler thing and want to make HTTP requests as quickly as possible. tornado's AsyncHttpClient seems like a good choice, but all the example code I've seen (e.g. https://stackoverflow.com/a/25549675/1650177) basically call AsyncHttpClient.fetch on a huge list of URLs to let tornado queue them up and eventually make the requests.

But what if I want to process an indefinitely long (or just a really big) list of URLs from a file or the network? I don't want to load all the URLs into memory.

Googled around but can't seem to find a way to AsyncHttpClient.fetch from an iterator. I did however find a way to do what I want using gevent: http://gevent.org/gevent.threadpool.html#gevent.threadpool.ThreadPool.imap. Is there a way to do something similar in tornado?

One solution I've thought of is to only queue up so many URLs initially then add logic to queue up more when a fetch operation completes but I'm hoping there's a cleaner solution.

Any help or recommendations would be appreciated!

Upvotes: 4

Views: 657

Answers (1)

Ben Darnell
Ben Darnell

Reputation: 22154

I would do this with a Queue and multiple workers, in a variation on https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py

import tornado.queues
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop

NUM_WORKERS = 10
QUEUE_SIZE = 100
q = tornado.queues.Queue(QUEUE_SIZE)
AsyncHTTPClient.configure(None, max_clients=NUM_WORKERS)
http_client = AsyncHTTPClient()

@gen.coroutine
def worker():
    while True:
        url = yield q.get()
        try:
            response = yield http_client.fetch(url)
            print('got response from', url)
        except Exception:
            print('failed to fetch', url)
        finally:
            q.task_done()

@gen.coroutine
def main():
    for i in range(NUM_WORKERS):
        IOLoop.current().spawn_callback(worker)
    with open("urls.txt") as f:
        for line in f:
            url = line.strip()
            # When the queue fills up, stop here to wait instead
            # of reading more from the file.
            yield q.put(url)
    yield q.join()

if __name__ == '__main__':
    IOLoop.current().run_sync(main)

Upvotes: 4

Related Questions