Reputation: 13665
cmd
is a function that process the argument x print the output to stdout. For example, it may be
def cmd(x):
print(x)
A serial program calling cmd()
looks like the following.
for x in array:
cmd(x)
To speed up the program, I'd like it run in parallel. The stdout output can be out-of-order, but the output from a single x must not be broken by the output from another x.
There can be various ways to implement this in python. I figure out something like this.
from joblib import Parallel, delayed
Parallel(n_jobs=100)(delayed(cmd)(i) for i in range(100))
Is this the best way to implement this in python in terms of code simplicity/readability and efficiency?
Also, the above code runs OK on python3. But not on python2, I got the following error. Is it a problem that may cause errors?
/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/joblib/externals/loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value "increase its maximal value", RuntimeWarning)
Thanks.
Upvotes: 2
Views: 341
Reputation:
I would approach the issue in the question with the following code (assuming we talk about CPU bound operations):
import multiprocessing as mp
import random
def cmd(value):
# some CPU heavy calculation
for dummy in range(10 ** 8):
random.random()
# result
return "result for {}".format(value)
if __name__ == '__main__':
data = [val for val in range(10)]
pool = mp.Pool(4) # 4 - is the number of processes (the number of CPU cores used)
# result is obtained after the process of all the data
result = pool.map(cmd, data)
print(result)
Output:
['result for 0', 'result for 1', 'result for 2', 'result for 3', 'result for 4', 'result for 5', 'result for 6', 'result for 7', 'result for 8', 'result for 9']
EDIT - another implementation to get result immedeately after calculation - processes
and queues
instead of pool
and map
:
import multiprocessing
import random
def cmd(value, result_queue):
# some CPU heavy calculation
for dummy in range(10 ** 8):
random.random()
# result
result_queue.put("result for {}".format(value))
if __name__ == '__main__':
data = [val for val in range(10)]
results = multiprocessing.Queue()
LIMIT = 3 # 3 - is the number of processes (the number of CPU cores used)
counter = 0
for val in data:
counter += 1
multiprocessing.Process(
target=cmd,
kwargs={'value': val, 'result_queue': results}
).start()
if counter >= LIMIT:
print(results.get())
counter -= 1
for dummy in range(LIMIT - 1):
print(results.get())
Output:
result for 0
result for 1
result for 2
result for 3
result for 4
result for 5
result for 7
result for 6
result for 8
result for 9
Upvotes: 0
Reputation: 3801
If you're using python3, then you can use concurrent.futures from standard library instead
Consider following usage:
with concurrent.futures.ProcessPoolExecutor(100) as executor:
for x in array:
executor.submit(cmd, x)
Upvotes: 1
Reputation: 218
in standard library https://docs.python.org/3/library/threading.html
import threading
def cmd(x):
lock.acquire(blocking=True)
print(x)
lock.release()
lock = threading.Lock()
for i in range(100):
t = threading.Thread(target=cmd, args=(i,))
t.start()
Using lock guarantees that the code between lock.acquire()
and lock.release()
is only executed by one thread at a time. print
method is already thread-safe in python3 so the output will not be interrupted even without a lock. But if you have any state shared between threads (an object they modify) you need a lock.
Upvotes: 1