Reputation: 189
I have a for loop below that runs a few functions and appends the result to a list. The list 'results' will end up with 12 elements (3 functions x 4 loop iterations = 12).
How can I parallelize this? Since my loop will iterate 4 times, does that mean I can only use 4 threads/cores?
results = []
for points in [250, 500, 1000, 4000]:
df_dx, db_dy = simulate_cointegration(num_points=points, stax=0.0100, stay=0.0050, coex=0.0200,
coey=0.0200)
johansen_results = johansen_test(df_dx, db_dy)
cadf_results = cadf_test(df_dx, db_dy)
po_results = po_test(df_dx, db_dy)
results.append(johansen_results)
results.append(cadf_results)
results.append(po_results)
Upvotes: 0
Views: 486
Reputation: 549
This is a basic answer to how you can parallelize your computations. Note Thymen's comment explaining that this answer is limited to the usage of 1 core. So if you can run 4 threads on your core, you'll potentially go 4 times faster (surely less in reality).
import threading
results = []
# 1. create & start the first 4 threads (one per point value)
test_threads = []
def point_task(points):
# you have to do this computation once before you can run your 3 test threads
df_dx, db_dy = simulate_cointegration(num_points=points, stax=0.0100, stay=0.0050, coex=0.0200, coey=0.0200)
# each thread will do some computation & appends the results
thread1 = threading.Thread(target=lambda dx, dy: results.append(johansen_test(dx, dy)), args=(df_dx, db_dy))
thread2 = threading.Thread(target=lambda dx, dy: results.append(cadf_test(dx, dy)), args=(df_dx, db_dy))
thread3 = threading.Thread(target=lambda dx, dy: results.append(po_test(dx, dy)), args=(df_dx, db_dy))
# remember the three tasks to call thread.join() at the end
test_threads.append(thread1)
test_threads.append(thread2)
test_threads.append(thread3)
# then start all the tasks while you prepare the others
thread1.start()
thread2.start()
thread3.start()
# 2. create one thread per point value, each thread will create three new threads
points_threads = []
for points in [250, 500, 1000, 4000]:
thread = threading.Thread(target=point_task, args=(points,))
points_threads.append(thread)
# 3. start all the threads
for thread in points_threads:
thread.start()
# 4. make sure the first four threads are finished before making sure the test_threads are over as well
# this is because if one of the first threads is no over yet, then you might have a missing thread in 'tests_threads' on which you won't be able to call '.join()'
for thread in points_threads:
thread.join()
for thread in tests_threads:
thread.join()
print(results)
So if use a simple implementation of the other methods :
#!/usr/bin/env python
# -*- coding: utf-8 -*-
def simulate_cointegration(num_points=100, stax=0.0100, stay=0.0050, coex=0.0200, coey=0.0200):
return 1, 1
def johansen_test(dx, dy):
return 1
def cadf_test(dx, dy):
return 2
def po_test(dx, dy):
return 3
Then you get the following results:
print(results)
>>> [1, 2, 1, 3, 1, 2, 1, 3, 2, 3, 2, 3]
If you wish to use more cores, see multiprocessing or MPI.
Also, I am not sure how Python manages the critical resources, but you might want to use locks on your results list or replace it with queues for good practice.
Edit : just found this answer Are lists thread-safe?. So you should be able to keep your list as a simple python list, as long as you keep to the threading
module and do not use multiprocessing
or something else.
Upvotes: 1