Programmer120
Programmer120

Reputation: 2592

ThreadPoolExecutor runs as iterative rather with threads

I have the following code:

def getdata(page, hed, limit):
    data = []
    print page
    datarALL = []
    url = 'http://...WithTotal=true&cultureid=1&offset={0}&limit={1}'.format(value_offset, value_limit)
    print page
    print url
    responsedata = requests.get(url, data=data, headers=hed, verify=False)
    if responsedata.status_code == 200:  # 200 for successful call
        responsedata = responsedata.text
        jsondata = json.loads(responsedata)
        if "results" in jsondata:
            if jsondata["results"]:
                datarALL = datarALL + jsondata["results"]
    print "page {} finished".format(page)
    return data


def start(data, auth_token):
    # # ---  Get data from API --
    hed = {'Authorization': 'Bearer ' + auth_token, 'Accept': 'application/json'}

    urlApi = 'http://...WithTotal=true&cultureid=1&offset=0&limit=1'
    responsedata = requests.get(urlApi, data=data, headers=hed, verify=False)
    num_of_records = int(math.ceil(responsedata.json()['total']))
    value_limit = 249  # Number of records per page.
    num_of_pages = num_of_records / value_limit
    print num_of_records
    print num_of_pages
    pages = [i for i in range(0, num_of_pages - 1)]
    from concurrent.futures import ThreadPoolExecutor, as_completed
    datarALL = []
    with ThreadPoolExecutor(max_workers=num_of_pages) as executor:
        futh = [executor.submit(getdata(page, hed, value_limit), page) for page in pages]
        for data in as_completed(futh):
            datarALL = datarALL + data.result()
    return datarALL

Basically start() create pages and getdata() runs per page. The print shows me:

0
http://...WithTotal=true&cultureid=1&&offset=0&limit=249
page 0 finished
1
http:/...WithTotal=true&cultureid=1&&offset=249&limit=249
page 1 finished
etc...

However I expected that all pages would be created on the same time then each one of them runs when the thread gets CPU time but what actually happens is that only when getdata() finishs The next page is created. Which means the threads are useless here. I should note that each getdata() call takes about 4-5 minutes to finish.

I suspect that the problem is here:

futh = [executor.submit(getdata(page, hed, value_limit), page) for page in pages]

It waits for getdata() to finish before the next loop run.

How can I fix it and make it works with the threads?

Upvotes: 2

Views: 81

Answers (2)

Gevorg Davoian
Gevorg Davoian

Reputation: 524

You have to submit a function (without actually calling it!) to executor.submit. So in your particular case, you should fix the hed and value_limit arguments in the getdata function to make it a function with a single argument page.

The easiest solution may look like the following:

getdata_partial = lambda page: getdata(page, hed, value_limit)

Then you could use it as shown below:

futh = [executor.submit(getdata_partial, page) for page in pages]

Another possible solution is to use functools.partial. You may find it even more elegant, but the idea is still the same.

Upvotes: 1

abarnert
abarnert

Reputation: 366003

The problem is that you're not executing tasks in the executor at all. Instead, you're calling the 5-minute function, then trying to execute its result as a task:

[executor.submit(getdata(page, hed, value_limit), page) for page in pages]

That getdata(page, hed, value_limit) is a function call: it calls getdata and waits for its return value.

What you need to do is pass the function itself to submit, like this:

executor.submit(getdata, page, hed, value_limit)

I'm not sure what you're trying to do with the extra , page, but if you wanted a list of (future, page) tuples, that would be:

[(executor.submit(getdata, page, hed, value_limit), page) for page in pages]

Upvotes: 3

Related Questions