Reputation: 623
I was trying python multiprocessing module to reduce time for my filtering code. At the beginning I have done some experiment. Results are not promising.
I've defined a function to run a loop within a certain range. Then I've run this function with and without threading and measured the time. Here is my code:
import time
from multiprocessing.pool import ThreadPool
def do_loop(i,j):
l = []
for i in range(i,j):
l.append(i)
return l
#loop veriable
x = 7
#without thredding
start_time = time.time()
c = do_loop(0,10**x)
print("--- %s seconds ---" % (time.time() - start_time))
#with thredding
def thread_work(n):
#dividing loop size
a = 0
b = int(n/2)
c = int(n/2)
#multiprocessing
pool = ThreadPool(processes=10)
async_result1 = pool.apply_async(do_loop, (a,b))
async_result2 = pool.apply_async(do_loop, (b,c))
async_result3 = pool.apply_async(do_loop, (c,n))
#get the result from all processes]
result = async_result1.get() + async_result2.get() + async_result3.get()
return result
start_time = time.time()
ll = thread_work(10**x)
print("--- %s seconds ---" % (time.time() - start_time))
For x=7 the result is:
--- 1.0931916236877441 seconds ---
--- 1.4213247299194336 seconds ---
Without threading it takes less time. And here is another problem. For X=8, most of the time I get MemoryError for threading. Once I got this result:
--- 17.04124426841736 seconds ---
--- 32.871358156204224 seconds ---
The solution is important as I need to optimize a filtering task which takes 6 hours.
Upvotes: 2
Views: 3788
Reputation: 17322
Depending on your task, multiprocessing may or may not take longer. If you want to take advantages of your CPU cores and speed up your filtering process then you should use multiprocessing.Pool
offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism).
I've been creating an example of data filtering and then I've been measuring the timing of a simple approach and the timing of a multiprocess approach. (starting from your code)
# take only the sentences that ends in "we are what we dream", the second word is "are"
import time
from multiprocessing.pool import Pool
LEN_FILTER_SENTENCE = len('we are what we dream')
num_process = 10
def do_loop(sentences):
l = []
for sentence in sentences:
if sentence[-LEN_FILTER_SENTENCE:].lower() =='we are what we doing' and sentence.split()[1] == 'are':
l.append(sentence)
return l
#with thredding
def thread_work(sentences):
#multiprocessing
pool = Pool(processes=num_process)
pool_food = (sentences[i: i + num_process] for i in range(0, len(sentences), num_process))
result = pool.map(do_loop, pool_food)
return result
def test(data_size=5, sentence_size=100):
to_be_filtered = ['we are what we doing'*sentence_size] * 10 ** data_size + ['we are what we dream'*sentence_size] * 10 ** data_size
start_time = time.time()
c = do_loop(to_be_filtered)
simple_time = (time.time() - start_time)
start_time = time.time()
ll = [e for l in thread_work(to_be_filtered) for e in l]
multiprocessing_time = (time.time() - start_time)
assert c == ll
return simple_time, multiprocessing_time
data_size represents the length of your data and sentence_size is a factor of multiplication for each data element, you can see that sentence_size is directly proportional with the number of CPU operations requested for each item from your data.
data_size = [1, 2, 3, 4, 5, 6]
results = {i: {'simple_time': [], 'multiprocessing_time': []} for i in data_size}
sentence_size = list(range(1, 500, 100))
for size in data_size:
for s_size in sentence_size:
simple_time, multiprocessing_time = test(size, s_size)
results[size]['simple_time'].append(simple_time)
results[size]['multiprocessing_time'].append(multiprocessing_time)
import pandas as pd
df_small_data = pd.DataFrame({'simple_data_size_1': results[1]['simple_time'],
'simple_data_size_2': results[2]['simple_time'],
'simple_data_size_3': results[3]['simple_time'],
'multiprocessing_data_size_1': results[1]['multiprocessing_time'],
'multiprocessing_data_size_2': results[2]['multiprocessing_time'],
'multiprocessing_data_size_3': results[3]['multiprocessing_time'],
'sentence_size': sentence_size})
df_big_data = pd.DataFrame({'simple_data_size_4': results[4]['simple_time'],
'simple_data_size_5': results[5]['simple_time'],
'simple_data_size_6': results[6]['simple_time'],
'multiprocessing_data_size_4': results[4]['multiprocessing_time'],
'multiprocessing_data_size_5': results[5]['multiprocessing_time'],
'multiprocessing_data_size_6': results[6]['multiprocessing_time'],
'sentence_size': sentence_size})
Ploting the timing for small data:
ax = df_small_data.set_index('sentence_size').plot(figsize=(20, 10), title = 'Simple vs multiprocessing approach for small data')
ax.set_ylabel('Time in seconds')
Ploting the timing for big data(relative big data):
As you can see, the multiprocessing power is revealing when you have big data that requires relatively significant CPU power for each data element.
Upvotes: 6
Reputation: 1002
Why do you use multiprocessing for threads?
Best is to create multiple instances of threads. Give each of them their tasks. Finally, start all of them. And wait until they finish. In the meantime, collect results to some list.
From my experience (for one particular task), I have found that even creating a whole graph of threads at the beginning gives smaller overhead than directly before starting tasks in next node in graph. I mean it for 10, 100, 1000, 10000 threads. Just make sure that threads are sleeping during idle time i.e. time.sleep(0.5)
to avoid wasting cycles of CPU.
With threads, you can use lists, dictionaries, and queues, which are thread-safe.
Upvotes: 1
Reputation: 1044
Aroosh Rana may have the best answer, but when testing using that approach a couple things to watch out for. The way you grow your array in the loop may be very inefficient, instead consider allocating its full size up front. Also, look closely at the way you divided up the work, you have two loops that process half the array and one that goes from n/2 to n/2. Also as mentioned elsewhere the word done is rather trivial and wouldn't benefit from parallel processing. I've tried to improve upon your previous test.
import time
from multiprocessing.pool import ThreadPool
import math
def do_loop(array, i,j):
for k in range(i,j):
array[k] = math.cos(1/(1+k))
return array
#loop veriable
x = 7
array_size = 2*10**x
#without thredding
start_time = time.time()
array = [0]*array_size
c = do_loop(array, 0,array_size)
print("--- %s seconds ---" % (time.time() - start_time))
#with thredding
def thread_work(n):
#dividing loop size
array = [0]*n
a = 0
b = int(n/3)
c = int(2*n/3)
#multiprocessing
pool = ThreadPool(processes=4)
async_result1 = pool.apply_async(do_loop, (array, a,b))
async_result2 = pool.apply_async(do_loop, (array, b,c))
async_result3 = pool.apply_async(do_loop, (array, c,n))
#get the result from all processes]
result1 = async_result1.get()
result2 = async_result2.get()
result3 = async_result3.get()
start_time = time.time()
result = result1+result2+result3
print("--- %s seconds ---" % (time.time() - start_time))
return result
start_time = time.time()
ll = thread_work(array_size)
print("--- %s seconds ---" % (time.time() - start_time))
Also keep in mind that with an approach like this you wouldn't have to combine the results at the end as each thread would be processing on the same array.
Upvotes: 1
Reputation: 126
It's better you use multiprocessing.Process(), as python has Global Interpreter Lock(GIL). So even if you make threads to increase the speed of your tasks, it won't increase, it'll go one by one. You can refer python doc for GIL and threading.
Upvotes: 1
Reputation: 189387
The task here is so small that the parallelization overhead hugely dominates over the benefits. This is a common FAQ.
Upvotes: 2