felix001
felix001

Reputation: 16691

Limiting Threads within Python Threading, Queue

Im using the following code to multithread urlib2. However what is the best way to limit the number of threads that it consumes ??

class ApiMultiThreadHelper:

    def __init__(self,api_calls):
        self.q = Queue.Queue()
        self.api_datastore = {}
        self.api_calls = api_calls
        self.userpass = '#####'

    def query_api(self,q,api_query):
        self.q.put(self.issue_request(api_query))

    def issue_request(self,api_query):

        self.api_datastore.update({api_query:{}})

        for lookup in ["call1","call2"]:
            query = api_query+lookup

            request = urllib2.Request(query)
            request.add_header("Authorization", "Basic %s" % self.userpass)
            f = urllib2.urlopen(request)
            response = f.read()
            f.close()

            self.api_datastore[api_query].update({lookup:response})

        return True

    def go(self):
        threads = []
        for i in self.api_calls:
            t = threading.Thread(target=self.query_api, args = (self.q,i))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()

Upvotes: 1

Views: 132

Answers (1)

freakish
freakish

Reputation: 56467

You should use a thread pool. Here's my implementation I've made years ago (Python 3.x friendly):

import traceback
from threading import Thread
try:
    import queue as Queue  # Python3.x
except ImportError:
    import Queue

class ThreadPool(object):
    def __init__(self, no=10):
        self.alive = True
        self.tasks = Queue.Queue()
        self.threads = []
        for _ in range(no):
            t = Thread(target=self.worker)
            t.start()
            self.threads.append(t)

    def worker(self):
        while self.alive:
            try:
                fn, args, kwargs = self.tasks.get(timeout=0.5)
            except Queue.Empty:
                continue
            except ValueError:
                self.tasks.task_done()
                continue

            try:
                fn(*args, **kwargs)
            except Exception:
                # might wanna add some better error handling
                traceback.print_exc()

            self.tasks.task_done()

    def add_job(self, fn, args=[], kwargs={}):
        self.tasks.put((fn, args, kwargs))

    def join(self):
        self.tasks.join()

    def deactivate(self):
        self.alive = False
        for t in self.threads:
            t.join()

You can also find a similar class in multiprocessing.pool module (don't ask me why it is there). You can then refactor your code like this:

def go(self):
    tp = ThreadPool(20)  # <-- 20 thread workers
    for i in self.api_calls:
        tp.add_job(self.query_api, args=(self.q, i))
    tp.join()
    tp.deactivate()

Number of threads is now defined a priori.

Upvotes: 1

Related Questions