Reputation: 29
How do I add tqdm
for the multiprocessing for loop here. Namely I want to wrap urls
in tqdm()
:
jobs = []
urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
for url in urls:
job = pool.apply_async(worker, (url, q))
jobs.append(job)
for job in jobs:
job.get()
pool.close()
pool.join()
The suggested solution on GitHub is this:
pbar = tqdm(total=100)
def update(*a):
pbar.update()
# tqdm.write(str(a))
for i in range(pbar.total):
pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
But my iterable is a list of URLs as opposed to a range like in the above. How do I translate the above solution to my for loop?
Upvotes: 1
Views: 887
Reputation: 44213
The easiest solution that is compatible with your current code is to just specify the callback argument to apply_async
(and if there is a possibility of an exception in worker
, then specify the error_callback argument too).
from multiprocessing import Pool
from tqdm import tqdm
def worker(url):
# So that the progress par does not proceed to quickly
# for demo purposes:
import time
time.sleep(1)
# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
def my_callback(result):
pbar.update()
# for this demo:
#urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
urls = list('abcdefghijklmnopqrstuvwxyz')
with tqdm(total=len(urls)) as pbar:
pool = Pool()
jobs = [
pool.apply_async(worker, (url,), callback=my_callback, error_callback=my_callback)
for url in urls
]
# You can delete the next two statements if you don't need
# to save the value of jobs.get() since the calls to
# pool.close() and pool.join() will wait for all submitted
# tasks to complete:
for job in jobs:
job.get()
pool.close()
pool.join()
Or instead of using apply_async
, use imap
(or imap_unordered if you do not care either about the results or the order of the results):
from multiprocessing import Pool
from tqdm import tqdm
def worker(url):
import time
time.sleep(1) # so that the progress par does not proceed to quickly:
return url
# For compatibility with platforms that use the *spawn* method (e.g. Windows):
if __name__ == '__main__':
# for this demo:
#urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
urls = list('abcdefghijklmnopqrstuvwxyz')
pool = Pool()
results = list(tqdm(pool.imap(worker, urls), total=len(urls)))
print(results)
pool.close()
pool.join()
Note
If you won't or can't use apply_async
with a callback, then imap_unordered
is to be preferred over imap
, assuming you don't need to have the results returned in task-submission order, which imap
is obliged to do. The potential problem with imap
is that if for some reason the first task submitted to the pool were the last to complete, no results can be returned until that first submitted task finishes. When that occurs all the other submitted task will have already completed and so your progress bar will not move at all and then it will suddenly go from 0% to 100% as quickly as you can iterate the results.
Admittedly the above scenario is an extreme case not likely to occur too often, but you would still like the progress bar to advance as tasks complete regardless of that order of completion. For this and getting results back in task-submission order apply_async
with a callback is probably best. The only drawback to apply_async
is that if you have a very large number of tasks to submit, they cannot be "chunked up" (see the chunksize argument to imap
and imap_unordered
) without your doing your own chunking logic.
Upvotes: 1
Reputation: 27
You can use Parallel
and Delayed
from Joblib
and use tqdm in the following manner:
from multiprocessing import cpu_count
from joblib import Parallel, delayed
def process_urls(urls,i):
#define your function here
Call function using:
urls = pd.read_csv(dataset, header=None).to_numpy().flatten()
Parallel(n_jobs=cpu_count(), prefer='processes')(delayed(process_urls)(urls, i) for i in tqdm(range(len(urls.axes[0]))))
Upvotes: 0