Reputation: 2915
For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?
The structure of the code may be considered as:
solve1(A)
solve2(B)
Where solve1
and solve2
are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time?
The code is:
def solve(Q, G, n):
i = 0
tol = 10 ** -4
while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)
if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
node1 = partition[0]
node2 = partition[1]
G = updateGraph(G, node1, node2)
if i == 999:
print "Maximum iteration reaches"
print inneropt
Where setinner
and setouter
are two independent functions. That's where I want to parallel...
Upvotes: 205
Views: 403497
Reputation: 21
You can't do parallel programming in python using threads. You must use multiprocessing, or if you do things like files or internet packets then you can use async
, await
, and asyncio
.
If you want threads, you can try to use cython
but you must install Visual Studio with python and have the developer pack installed too.
Upvotes: 0
Reputation: 1003
The following may suit you if you cannot invest the time to learn the requirements and assumptions of the libraries or modules recommended in the other answers:
n
parts in parallel, launch them with child = subprocess.Popen(args = [sys.argv[0], ...])
, providing the part number and other details in additional options and/or parameter files, and call child.wait()
for each child.If you want to monitor progress, launch more workers as soon as workers finish or do something else while waiting, use child.poll()
instead of child.wait()
and check whether child.returncode
is still None
.
For big tasks, the overhead of launching new processes and writing and reading files is minimal. For many small tasks one would want to launch workers only once and then communicate with them via pipes or sockets but that's a lot more work and and has to be done carefully to avoid the possibility of deadlocks. In this situation, it is probably better to learn how to use the modules recommended in other answers.
Upvotes: 2
Reputation: 725
Here is a complete example that works on Windows environment; the advantage of asynchronous processing is to save time:
import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool
def f1(a):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1
def f2(b):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
result1 = pool.apply_async(f1, [0])
result2 = pool.apply_async(f2, [9])
freeze_support()
t0 = time.time()
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
print(time.time()-t0)
t0 = time.time()
aa = f1(1)
bb = f2(2)
print(time.time()-t0)
Upvotes: 1
Reputation: 880
You can convert your Dataframe to Dask Dataframe and It can handle the parallel computing for you.
import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)
Upvotes: 0
Reputation: 31
I always use the 'multiprocessing' native library to handle parallelism in Python. To control the number of processes in the queue, I use a shared variable as a counter. In the following example, you can see how the parallel execution of simple processes works.
I made an update to the script to make it easier to use. Basically, the only thing you have to do is override the process
method with the function you want to run in parallel. See the example, the procedure is very simple. Alternatively, you can also remove all execution log occurrences.
When I have some time, I'll update the code to work with processes that return values.
user@host:~$ pip install coloredlogs==15.0.1
Parallel processing script (copy and paste):
#!/usr/bin/env python
# encoding: utf-8
from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os
LOG_LEVEL = "DEBUG"
def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
# Setting-up the script logging:
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=level
)
logger = logging.getLogger(name)
coloredlogs.install(level=level, logger=logger, isatty=True)
return logger
class ParallelProcessing:
"""
Parallel processing.
References
----------
[1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419
Examples
--------
>>> class MyParallelProcessing(ParallelProcessing):
>>> def process(self, name: str) -> None:
>>> logger = get_logger()
>>> logger.info(f"Executing process: {name}...")
>>> time.sleep(5)
>>>
>>>
>>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
>>> mpp = MyParallelProcessing()
>>> mpp.run(args_list=params_list)
"""
_n_jobs: int
_waiting_time: int
_queue: Value
_logger: Logger
def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
"""
Instantiates a parallel processing object to execute processes in parallel.
Parameters
----------
n_jobs: int
Number of jobs.
waiting_time: int
Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
"""
self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
self._logger = get_logger()
def process(self, *args) -> None:
"""
Abstract process that must be overridden.
Parameters
----------
*args
Parameters of the process to be executed.
"""
raise NotImplementedError("Process not defined ('NotImplementedError' exception).")
def _execute(self, *args) -> None:
"""
Run the process and remove it from the process queue by decreasing the queue process counter.
Parameters
----------
*args
Parameters of the process to be executed.
"""
self.process(*args)
self._queue.value -= 1
def _error_callback(self, result: Any) -> None:
"""
Error callback.
Parameters
----------
result: Any
Result from exceptions.
"""
self._logger.error(result)
os._exit(1)
def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
"""
Run processes in parallel.
Parameters
----------
args_list: Iterator[tuple]
List of process parameters (`*args`).
use_multithreading: bool
Use multithreading instead multiprocessing.
"""
manager = Manager()
self._queue = manager.Value('i', 0)
lock = manager.Lock()
pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)
start_time = datetime.now()
with lock: # Write-protecting the processes queue shared variable.
for args in args_list:
while True:
if self._queue.value < self._n_jobs:
self._queue.value += 1
# Running processes in parallel:
pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)
break
else:
self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
time.sleep(self._waiting_time)
pool.close()
pool.join()
exec_time = datetime.now() - start_time
self._logger.info(f"Execution time: {exec_time}")
Example of use:
class MyParallelProcessing(ParallelProcessing):
def process(self, name: str) -> None:
"""
Process to run in parallel (overrides abstract method).
"""
logger = get_logger()
logger.info(f"Executing process: {name}...")
time.sleep(5)
def main() -> None:
n_jobs = int(sys.argv[1]) # Number of jobs to run in parallel.
params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
mpp = MyParallelProcessing(n_jobs=n_jobs)
# Executing processes in parallel:
mpp.run(args_list=params_list)
if __name__ == '__main__':
main()
user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934
Upvotes: 5
Reputation: 151
You can use joblib
library to do parallel computation and multiprocessing.
from joblib import Parallel, delayed
You can simply create a function foo
which you want to be run in parallel and based on the following piece of code implement parallel processing:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
Where num_cores
can be obtained from multiprocessing
library as followed:
import multiprocessing
num_cores = multiprocessing.cpu_count()
If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the partial
function from functools
library as follow:
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
'''
body of the function
'''
return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
You can find a complete explanation of the python and R multiprocessing with couple of examples here.
Upvotes: 6
Reputation: 31850
In some cases, it's possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++ and then automatically parallelize it using the Intel C++ compiler, though I haven't tried this yet.
Upvotes: 4
Reputation: 96
The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4py and mpi4py (I am the developer of charm4py).
There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph G
) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.
For example, in charm4py this can be done like this:
class Worker(Chare):
def __init__(self, Q, G, n):
self.G = G
...
def setinner(self, node1, node2):
self.updateGraph(node1, node2)
...
def solve(Q, G, n):
# create 2 workers, each on a different process, passing the initial state
worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
while i < 1000:
result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A
result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B
inneropt, partition, x = result_a.get() # wait for result from worker A
outeropt = result_b.get() # wait for result from worker B
...
Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:
result_a.get()
is blocked waiting on the result, worker A does the computation in the same process.Upvotes: 7
Reputation: 3372
This can be done very elegantly with Ray.
To parallelize your example, you'd need to define your functions with the @ray.remote
decorator, and then invoke them with .remote
.
import ray
ray.init()
# Define the functions.
@ray.remote
def solve1(a):
return 1
@ray.remote
def solve2(b):
return 2
# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)
# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])
There are a number of advantages of this over the multiprocessing module.
These function calls can be composed together, e.g.,
@ray.remote
def f(x):
return x + 1
x_id = f.remote(1)
y_id = f.remote(x_id)
z_id = f.remote(y_id)
ray.get(z_id) # returns 4
Note that Ray is a framework I've been helping develop.
Upvotes: 59
Reputation: 40243
You can use the multiprocessing module. For this case I might use a processing pool:
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
This will spawn processes that can do generic work for you. Since we did not pass processes
, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.
If you want to map a list to a single function you would do this:
args = [A, B]
results = pool.map(solve1, args)
Don't use threads because the GIL locks any operations on python objects.
Upvotes: 222
Reputation: 2023
CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++
This topic has several useful examples and descriptions of the challenge:
Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?
Upvotes: 6