MilTom
MilTom

Reputation: 189

How to properly parallelize and append a for loop?

I have a for loop below that runs a few functions and appends the result to a list. The list 'results' will end up with 12 elements (3 functions x 4 loop iterations = 12).

How can I parallelize this? Since my loop will iterate 4 times, does that mean I can only use 4 threads/cores?

results = []
for points in [250, 500, 1000, 4000]:

    df_dx, db_dy = simulate_cointegration(num_points=points, stax=0.0100, stay=0.0050, coex=0.0200, 
                                           coey=0.0200)

    johansen_results = johansen_test(df_dx, db_dy)
    cadf_results = cadf_test(df_dx, db_dy) 
    po_results = po_test(df_dx, db_dy) 

    results.append(johansen_results)
    results.append(cadf_results)
    results.append(po_results)

Upvotes: 0

Views: 486

Answers (1)

SpaceBurger
SpaceBurger

Reputation: 549

This is a basic answer to how you can parallelize your computations. Note Thymen's comment explaining that this answer is limited to the usage of 1 core. So if you can run 4 threads on your core, you'll potentially go 4 times faster (surely less in reality).

import threading

results = []

# 1. create & start the first 4 threads (one per point value)
test_threads = []
def point_task(points):
    # you have to do this computation once before you can run your 3 test threads
    df_dx, db_dy = simulate_cointegration(num_points=points, stax=0.0100, stay=0.0050, coex=0.0200, coey=0.0200)

    # each thread will do some computation & appends the results
    thread1 = threading.Thread(target=lambda dx, dy: results.append(johansen_test(dx, dy)), args=(df_dx, db_dy))
    thread2 = threading.Thread(target=lambda dx, dy: results.append(cadf_test(dx, dy)), args=(df_dx, db_dy))
    thread3 = threading.Thread(target=lambda dx, dy: results.append(po_test(dx, dy)), args=(df_dx, db_dy))

    # remember the three tasks to call thread.join() at the end
    test_threads.append(thread1)
    test_threads.append(thread2)
    test_threads.append(thread3)

    # then start all the tasks while you prepare the others
    thread1.start()
    thread2.start()
    thread3.start()

# 2. create one thread per point value, each thread will create three new threads
points_threads = []
for points in [250, 500, 1000, 4000]:
    thread = threading.Thread(target=point_task, args=(points,))
    points_threads.append(thread)

# 3. start all the threads
for thread in points_threads:
    thread.start()

# 4. make sure the first four threads are finished before making sure the test_threads are over as well
#    this is because if one of the first threads is no over yet, then you might have a missing thread in 'tests_threads' on which you won't be able to call '.join()'
for thread in points_threads:
    thread.join()
for thread in tests_threads:
    thread.join()

print(results)

So if use a simple implementation of the other methods :

#!/usr/bin/env python
# -*- coding: utf-8 -*-

def simulate_cointegration(num_points=100, stax=0.0100, stay=0.0050, coex=0.0200, coey=0.0200):
    return 1, 1

def johansen_test(dx, dy):
    return 1

def cadf_test(dx, dy):
    return 2

def po_test(dx, dy):
    return 3

Then you get the following results:

print(results)
>>> [1, 2, 1, 3, 1, 2, 1, 3, 2, 3, 2, 3]

If you wish to use more cores, see multiprocessing or MPI.

Also, I am not sure how Python manages the critical resources, but you might want to use locks on your results list or replace it with queues for good practice.

Edit : just found this answer Are lists thread-safe?. So you should be able to keep your list as a simple python list, as long as you keep to the threading module and do not use multiprocessing or something else.

Upvotes: 1

Related Questions