VanDir
VanDir

Reputation: 2030

How to create and use an http parallel manager in Python?

I'm new to Python and I would like to create a class HttpParallelHandler to handle in parallel (multi-thread) mode all the HTTP requests that it receives. The clients (the class in other pieces of code that will use the HttpParallelHandler) have to register each HTTP request which they want make by passing some parameters, like the method, the data and the onsuccess callback.

Following an example of that class:

import threading
import time
import logging
import random
import Queue
import sys
import requests

class ParallelHttpHandler(object):
    POOL_SIZE = 10

    def __init__(self):
        self.requests_queue = Queue.Queue()
        self.callback_lock = threading.RLock()
        self.pool_threads = [RequestConsumerThread(self.requests_queue, self.callback_lock) for count in xrange(ParallelHttpHandler.POOL_SIZE)]
        for thread in self.pool_threads:
            thread.start()

    def http_get(self, url, data, headers=None, onsuccess=None, onerror=None):
        self.http_request("get", url, data, headers, onsuccess, onerror)

    def http_post(self, url, data, headers=None, onsuccess=None, onerror=None):
        self.http_request("post", url, data, headers, onsuccess, onerror)

    def http_request(self, method, url, data, headers, onsuccess, onerror):
        if not self.requests_queue.full():
            request = {
                "method": method,
                "url": url,
                "data": data,
                "headers": headers,
                "onsuccess": onsuccess,
                "onerror": onerror
            }
            self.requests_queue.put(request)

    def wait_all(self):
        self.requests_queue.join()

class RequestConsumerThread(threading.Thread):
    DEFAULT_HTTP_HEADERS = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Accept': 'text/html, */*; q=0.01',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'en-US,en;q=0.8',
        'Connection': 'keep-alive'
    }

    def __init__(self, requests_queue, callback_lock):
        super(RequestConsumerThread, self).__init__()
        self.requests_queue = requests_queue
        self.callback_lock = callback_lock
        self.daemon = True

    def http_get(self, url, data=None, headers=None):
        response = requests.get(url, data=data, headers=headers or RequestConsumerThread.DEFAULT_HTTP_HEADERS)
        return response.text if response.status_code == 200 else None
    def http_post(self, url, data, headers=None):
        response = requests.post(url, data=data, headers=headers or RequestConsumerThread.DEFAULT_HTTP_HEADERS)
        return response.text if response.status_code == 200 else None

    def run(self):
        while True:
            if not self.requests_queue.empty():
                request = self.requests_queue.get()
                try:
                    # Effettua la richiesta http
                    request_arguments = (request["url"], request["data"], request["headers"])
                    response = self.http_get(*request_arguments) if request["method"] == "get" else self.http_post(*request_arguments)

                    # In un contesto thread safe esegue la callback di success
                    self.callback_lock.acquire()
                    onsuccess = request["onsuccess"]
                    if onsuccess is not None:
                        onsuccess(response)
                    self.callback_lock.release()
                except Exception as e:
                    self.callback_lock.acquire()
                    onerror = request["onerror"]
                    if onerror is not None:
                        print(request["onerror"](e))
                    self.callback_lock.release()
                finally:
                    self.requests_queue.task_done()

Now the problem: when I use this class in a for loop, to handle 50 http requests in parallel I have a problem with the callback that I assign to each request.

Example code (the real code is more complicated):

if __name__ == '__main__':
    def onsuccess(html_code, index): 
        print "Success: html received. index: " + str(index)
    def onerror(e): 
        print "Error: " + str(e)

    http_handler = ParallelHttpHandler()
    urls = ["https://google.com?count=%d" % n for n in range(50)]
    for index, url in enumerate(urls):
        http_handler.http_get(url, None, None, lambda html_code: onsuccess(html_code, index), onerror)

    http_handler.wait_all()

This code outputs the following:

Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49

Why always 49? I know that in Python the passing strategy is call-by-reference but the code doesn't work even if I pass copy.copy(index).

Upvotes: 0

Views: 42

Answers (1)

freakish
freakish

Reputation: 56467

Its because of late binding and scoping rules (regarding loops) in Python. Unrelated to copying and the whole HTTP stuff. Somewhat related to calls-by-reference. See simplier example:

>>> lst = []
>>> for x in range(5):
...     lst.append(lambda : x)
... 
>>> for l in lst:
...     l()
... 
4
4
4
4
4

A solution is to wrap lambda with a function. It produces a new scope:

>>> lst = []
>>> def factory(x):
...     return lambda : x
... 
>>> for x in range(5):
...     lst.append(factory(x))
... 
>>> for l in lst:
...     l()
... 
0
1
2
3
4

Upvotes: 1

Related Questions