Reputation: 23
I have a program that I created using threads, but then I learned that threads don't run concurrently in python and processes do. As a result, I am trying to rewrite the program using multiprocessing, but I am having a hard time doing so. I have tried following several examples that show how to create the processes and pools, but I don't think it's exactly what I want.
Below is my code with the attempts I have tried. The program tries to estimate the value of pi by randomly placing points on a graph that contains a circle. The program takes two command-line arguments: one is the number of threads/processes I want to create, and the other is the total number of points to try placing on the graph (N).
import math
import sys
from time import time
import concurrent.futures
import random
import multiprocessing as mp
def myThread(arg):
# Take care of imput argument
n = int(arg)
print("Thread received. n = ", n)
# main calculation loop
count = 0
for i in range (0, n):
x = random.uniform(0,1)
y = random.uniform(0,1)
d = math.sqrt(x * x + y * y)
if (d < 1):
count = count + 1
print("Thread found ", count, " points inside circle.")
return count;
# end myThread
# receive command line arguments
if (len(sys.argv) == 3):
N = sys.argv[1] # original ex: 0.01
N = int(N)
totalThreads = sys.argv[2]
totalThreads = int(totalThreads)
print("N = ", N)
print("totalThreads = ", totalThreads)
else:
print("Incorrect number of arguments!")
sys.exit(1)
if ((totalThreads == 1) or (totalThreads == 2) or (totalThreads == 4) or (totalThreads == 8)):
print()
else:
print("Invalid number of threads. Please use 1, 2, 4, or 8 threads.")
sys.exit(1)
# start experiment
t = int(time() * 1000) # begin run time
total = 0
# ATTEMPT 1
# processes = []
# for i in range(totalThreads):
# process = mp.Process(target=myThread, args=(N/totalThreads))
# processes.append(process)
# process.start()
# for process in processes:
# process.join()
# ATTEMPT 2
#pool = mp.Pool(mp.cpu_count())
#total = pool.map(myThread, [N/totalThreads])
# ATTEMPT 3
#for i in range(totalThreads):
#total = total + pool.map(myThread, [N/totalThreads])
# p = mp.Process(target=myThread, args=(N/totalThreads))
# p.start()
# ATTEMPT 4
# with concurrent.futures.ThreadPoolExecutor() as executor:
# for i in range(totalThreads):
# future = executor.submit(myThread, N/totalThreads) # start thread
# total = total + future.result() # get result
# analyze results
pi = 4 * total / N
print("pi estimate =", pi)
delta_time = int(time() * 1000) - t # calculate time required
print("Time =", delta_time, " milliseconds")
I thought that creating a loop from 0 to totalThreads that creates a process for each iteration would work. I also wanted to pass in N/totalThreads (to divide the work), but it seems that processes take in an iterable list rather than an argument to pass to the method.
What is it I am missing with multiprocessing? Is it at all possible to even do what I want to do with processes? Thank you in advance for any help, it is greatly appreciated :)
Upvotes: 0
Views: 234
Reputation:
I have simplified your code and used some hard-coded values which may or may not be reasonable.
import math
import concurrent.futures
import random
from datetime import datetime
def myThread(arg):
count = 0
for i in range(0, arg[0]):
x = random.uniform(0, 1)
y = random.uniform(0, 1)
d = math.sqrt(x * x + y * y)
if (d < 1):
count += 1
return count
N = 10_000
T = 8
_start = datetime.now()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(myThread, (int(N / T),)): _ for _ in range(T)}
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
_end = datetime.now()
print(f'Estimate for PI = {4 * total / N}')
print(f'Run duration = {_end-_start}')
A typical output on my machine looks like this:-
Estimate for PI = 3.1472
Run duration = 0:00:00.008895
Bear in mind that the number of threads you start is effectively managed by the ThreadPoolExecutor (TPE) [ when constructed with no parameters ]. It makes decisions about the number of threads that can run based on your machine's processing capacity (number of cores etc). Therefore you could, if you really wanted to, set T to a very high number and the TPE will block execution of any new threads until it determines that there is capacity.
Upvotes: 1