Reputation: 41
My problem is the following: I have a multiprocessing.pool.ThreadPool
object with worker_count
workers and a main pqueue
from which I feed tasks to the pool.
The flow is as follows: There is a main loop that gets an item of level
level from pqueue
and submits it tot the pool using apply_async
. When the item is processed, it generates items of level + 1
. The problem is that the pool accepts all tasks and processes them in the order they were submitted.
More precisely, what is happening is that the level 0
items are processed and each generates 100 level 1
items that are retrieved immediately from pqueue
and added to the pool, each level 1
item produces 100 level 2
items that are submitted to the pool, and so on, and the items are processed in an BFS manner.
I need to tell the pool to not accept more than worker_count
items in order to give a chance of higher level to be retrieved from pqueue
in order to process items in a DFS manner.
The current solution I came with is: for each submitted task, save the AsyncResult
object in a asyncres_list
list, and before retrieving items from pqueue
I remove the items that were processed (if any), check if the length of the asyncres_list
is lower than the number of threads in the pool every 0.5 seconds, and like that only thread_number
items will be processed at the same time.
I am wondering if there is a cleaner way to achieve this behaviour and I can't seem to find in the documentation some parameters to limit the maximum number of tasks that can be submitted to a pool.
Upvotes: 4
Views: 9335
Reputation: 1865
This other solution is nice. If you'd like more "easy" concurrency, especially trying to multi-task blocking interfaces (e.g. requests
), the concurrent module is built in and may have what you want:
import concurrent.futures
def worker(num):
print(f'in worker with {num}')
return num*2
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
for future in concurrent.futures.as_completed([executor.submit(worker, i) for i in range(10000)]):
print(future.result())
Upvotes: 1
Reputation: 414225
ThreadPool
is a simple tool for a common task. If you want to manage the queue yourself, to get DFS behavior; you could implement the necessary functionality on top threading
and queue
modules directly.
To prevent scheduling the next root task until all tasks spawned by the current task are done ("DFS"-like order), you could use Queue.join()
:
#!/usr/bin/env python3
import queue
import random
import threading
import time
def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
for task in iter(q.get, None): # blocking get until None is received
try:
if len(task) < maxlevel:
for i in range(multiplicity):
q.put(task + str(i)) # schedule the next level
time.sleep(random.random()) # emulate some work
with lock:
print(task)
finally:
q.task_done()
worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
for _ in range(worker_count)]
for t in threads:
t.start()
for task in "01234": # populate the first level
q.put(task)
q.join() # block until all spawned tasks are done
for _ in threads: # signal workers to quit
q.put(None)
for t in threads: # wait until workers exit
t.join()
The code example is derived from the example in the queue
module documentation.
The task at each level spawns multiplicity
direct child tasks that spawn their own subtasks until maxlevel
is reached.
None
is used to signal the workers that they should quit. t.join()
is used to wait until threads exit gracefully. If the main thread is interrupted for any reason then the daemon threads are killed unless there are other non-daemon threads (you might want to provide SIGINT hanlder, to signal the workers to exit gracefully on Ctrl+C
instead of just dying).
queue.LifoQueue()
is used, to get "Last In First Out" order (it is approximate due to multiple threads).
The maxsize
is not set because otherwise the workers may deadlock--you have to put the task somewhere anyway. worker_count
background threads are running regardless of the task queue.
Upvotes: 3