Ronen Oren
Ronen Oren

Reputation: 31

Is there a way to utilize all cpu cores and power in calculations?

First, I've tried to use multithreading solution for this problem and discovered that it is not suitable for this purpose. Then I tried as community suggested to apply multiprocessing solution to bypass the GIL and even that performs poor compared to single process single thread code. Is python flawed in this domain? Is the only solution for heavy cpu calculations is to drop python for another language? I post my multiprocessing test code so you can get an impression.

from itertools import cycle
import random
import multiprocessing as mp
import time
# The class that represents the process
class Task(mp.Process):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        mp.Process.__init__(self, group=group, target=target, name=name, args=args ,kwargs=kwargs, daemon=daemon)
        self.inputs = []

    def run(self):
        print(f"{self.name} is running")
        for arr in self.inputs:
            arr.sort()
    
    def add_input(self, arr):
        self.inputs.append(arr)


# A util function to cycle on iterable a finite number of times.
def finite_cycle(cycle_on, times):
    infinite_cycle = cycle(cycle_on)
    for _ in range(times):
        yield next(infinite_cycle)


# Constants
THOUSAND = 1000
MILION = THOUSAND ** 2
PCNT = 2
TASK_CNT = 50 * THOUSAND

# Main
def main():
    processes = [Task(name = f"p{pid}") for pid in range(PCNT)]
    for pid in finite_cycle(range(PCNT), TASK_CNT):
        processes[pid].add_input([random.randint(1,10) for _ in range(100)])
    stime = time.time()
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f"execution time: {round(time.time() - stime, 2)}")
    print("finish.")

And this is the single process single thread code which is faster for every varation of the constants.

def main():
    inputs = [[random.randint(1,10) for _ in range(100)] for _ in range(TASK_CNT)]
    stime = time.time()
    for arr in inputs:
        arr.sort()
    print(f"execution time: {round(time.time() - stime, 2)}")
    print("finish.")

Upvotes: 0

Views: 194

Answers (1)

Booboo
Booboo

Reputation: 44283

On my desktop the run methods averaged each approximately .125 seconds to run while the time elapsed between calling the first start method and the start of the first run method was approximately .23 seconds (i.e. 1628456465.1061594 - 1628456464.8741603), most of that time I believe taken by the serialization/de-serialization of self.inputs. See below, which is the original program with a few timings added.

The point is that multiprocessing has two sources of overhead that the non-multiprocessing program does not have:

  1. Overhead in creating the processes.
  2. Overhead in passing arguments to and getting results back from the process. This involves moving data from one address space to another (via various mechanisms) in many cases unless shared memory is being used.

Multiprocessing therefore only becomes advantageous when the processing itself (the run method in this case) is so CPU-intensive that the aforementioned costs of multiprocessing are offset by being able to "divide and conquer" the problem.

from itertools import cycle
import random
import multiprocessing as mp
import time

# The class that represents the process
class Task(mp.Process):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        mp.Process.__init__(self, group=group, target=target, name=name, args=args ,kwargs=kwargs, daemon=daemon)
        self.inputs = []

    def run(self):
        t = time.time()
        print(f"{self.name} is running at:", t)
        for arr in self.inputs:
            arr.sort()
        print('elapsed time =', time.time() - t)

    def add_input(self, arr):
        self.inputs.append(arr)


# A util function to cycle on iterable a finite number of times.
def finite_cycle(cycle_on, times):
    infinite_cycle = cycle(cycle_on)
    for _ in range(times):
        yield next(infinite_cycle)


# Constants
THOUSAND = 1000
MILION = THOUSAND ** 2
PCNT = 2
TASK_CNT = 50 * THOUSAND

# Main
def main():
    processes = [Task(name = f"p{pid}") for pid in range(PCNT)]
    for pid in finite_cycle(range(PCNT), TASK_CNT):
        processes[pid].add_input([random.randint(1,10) for _ in range(100)])
    stime = time.time()
    print('stime =', stime)
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f"execution time: {round(time.time() - stime, 2)}")
    print("finish.")

if __name__ == '__main__':
    main()

Prints:

stime = 1628456464.8741603
p0 is running at: 1628456465.1061594
elapsed time = 0.1320023536682129
p1 is running at: 1628456465.3201597
elapsed time = 0.11999750137329102
execution time: 0.62
finish.

Upvotes: 1

Related Questions