ariyasas94
ariyasas94

Reputation: 29

How to add tqdm here?

How do I add tqdm for the multiprocessing for loop here. Namely I want to wrap urls in tqdm():

jobs = []
urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
for url in urls:
    job = pool.apply_async(worker, (url, q))
    jobs.append(job)

for job in jobs:
    job.get()

pool.close()
pool.join()

The suggested solution on GitHub is this:

pbar = tqdm(total=100)
def update(*a):
    pbar.update()
    # tqdm.write(str(a))
for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()

But my iterable is a list of URLs as opposed to a range like in the above. How do I translate the above solution to my for loop?

Upvotes: 1

Views: 887

Answers (2)

Booboo
Booboo

Reputation: 44213

The easiest solution that is compatible with your current code is to just specify the callback argument to apply_async (and if there is a possibility of an exception in worker, then specify the error_callback argument too).

from multiprocessing import Pool
from tqdm import tqdm

def worker(url):
    # So that the progress par does not proceed to quickly
    # for demo purposes:
    import time
    time.sleep(1)

# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
    def my_callback(result):
        pbar.update()

    # for this demo:
    #urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
    urls = list('abcdefghijklmnopqrstuvwxyz')
    with tqdm(total=len(urls)) as pbar:
        pool = Pool()
        jobs = [
            pool.apply_async(worker, (url,), callback=my_callback, error_callback=my_callback)
                for url in urls
        ]

        # You can delete the next two statements if you don't need
        # to save the value of jobs.get() since the calls to
        # pool.close() and pool.join() will wait for all submitted
        # tasks to complete:
        for job in jobs:
            job.get()

        pool.close()
        pool.join()

Or instead of using apply_async, use imap (or imap_unordered if you do not care either about the results or the order of the results):

from multiprocessing import Pool
from tqdm import tqdm

def worker(url):
    import time

    time.sleep(1) # so that the progress par does not proceed to quickly:
    return url


# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
    # for this demo:
    #urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
    urls = list('abcdefghijklmnopqrstuvwxyz')
    pool = Pool()
    results = list(tqdm(pool.imap(worker, urls), total=len(urls)))
    print(results)
    pool.close()
    pool.join()

Note

If you won't or can't use apply_async with a callback, then imap_unordered is to be preferred over imap, assuming you don't need to have the results returned in task-submission order, which imap is obliged to do. The potential problem with imap is that if for some reason the first task submitted to the pool were the last to complete, no results can be returned until that first submitted task finishes. When that occurs all the other submitted task will have already completed and so your progress bar will not move at all and then it will suddenly go from 0% to 100% as quickly as you can iterate the results.

Admittedly the above scenario is an extreme case not likely to occur too often, but you would still like the progress bar to advance as tasks complete regardless of that order of completion. For this and getting results back in task-submission order apply_async with a callback is probably best. The only drawback to apply_async is that if you have a very large number of tasks to submit, they cannot be "chunked up" (see the chunksize argument to imap and imap_unordered) without your doing your own chunking logic.

Upvotes: 1

user31934
user31934

Reputation: 27

You can use Parallel and Delayed from Joblib and use tqdm in the following manner:

from multiprocessing import cpu_count
from joblib import Parallel, delayed
def process_urls(urls,i):

   #define your function here 

Call function using:

urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
Parallel(n_jobs=cpu_count(), prefer='processes')(delayed(process_urls)(urls, i) for i in tqdm(range(len(urls.axes[0]))))

Upvotes: 0

Related Questions