user3889486
user3889486

Reputation: 656

Parallelizing a for loop in python

I have a dictionary where each key (date) contains a table (multiple lists of the format[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], .... I want to transform it into a DataFrame; this is done with the following code:

df4 = pd.DataFrame(columns=sorted(set_days))

for date in dic.keys():
        days = [day  for day, val1, val2  in dic[date]]
        val1 = [val1 for day, val1, val2  in dic[date]]
        df4.loc[date, days] = val1

This code works fine, but it takes more than two hours to run. After some research, I've realized I could parallelize it via the multiprocessing library; the following code is the intended parallel version

import multiprocessing

def func(date):
    global df4, dic
    days = [day  for day, val1, val2  in dic[date]]
    val1 = [val1 for day, val1, val2  in dic[date]]
    df4.loc[date, days] = val1

multiprocessing.Pool(processes=8).map(func, dic.keys())

The problem with this code is that, after executing multiprocessing.Pool(processes..., the df4 DataFrame is empty.

Any help would be much appreciated.

Example

Suppose the dictionary contains two days:

dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]

dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]

then the DataFrame should be of the form:

df4.loc[:, 1:50]
             1    2    3    4    5   ...     46   47   48     49     50
20030812  24.25  NaN  NaN  NaN  NaN  ...    NaN  NaN  NaN    NaN  22.85
20030813  24.23  NaN  NaN  NaN  NaN  ...    NaN  NaN  NaN  22.74    NaN

Also,

dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])

df1.head().to_dict()
Out: 
{1: {'20030812': 24.25, '20030813': 24.23},
 2: {'20030812': nan, '20030813': nan},
 3: {'20030812': nan, '20030813': nan},
 4: {'20030812': nan, '20030813': nan},
 5: {'20030812': nan, '20030813': nan},
 6: {'20030812': nan, '20030813': nan},
 7: {'20030812': nan, '20030813': nan},
 8: {'20030812': nan, '20030813': nan},
 9: {'20030812': nan, '20030813': nan},
 10: {'20030812': nan, '20030813': nan},
 11: {'20030812': nan, '20030813': nan},
 12: {'20030812': nan, '20030813': nan},
 13: {'20030812': nan, '20030813': nan},
 14: {'20030812': nan, '20030813': nan},
 15: {'20030812': nan, '20030813': nan},
 16: {'20030812': nan, '20030813': nan},
 17: {'20030812': nan, '20030813': nan},
 18: {'20030812': nan, '20030813': nan},
 19: {'20030812': nan, '20030813': 23.4},
 20: {'20030812': 23.54, '20030813': nan},
 21: {'20030812': nan, '20030813': nan},
 22: {'20030812': nan, '20030813': nan},
 23: {'20030812': nan, '20030813': nan},
 24: {'20030812': nan, '20030813': nan},
 25: {'20030812': nan, '20030813': nan},
 26: {'20030812': nan, '20030813': nan},
 27: {'20030812': nan, '20030813': nan},
 28: {'20030812': nan, '20030813': nan},
 29: {'20030812': nan, '20030813': nan},
 30: {'20030812': 23.13, '20030813': 22.97},
 31: {'20030812': nan, '20030813': nan},
 32: {'20030812': nan, '20030813': nan},
 ...

Upvotes: 3

Views: 243

Answers (2)

ShadowRanger
ShadowRanger

Reputation: 155323

To answer your original question (roughly: "Why is the df4 DataFrame empty?"), the reason this doesn't work is that when the Pool workers are launched, each worker inherits a personal copy-on-write view of the parent's data (either directly if multiprocessing is running on a UNIX-like system with fork, or via a kludgy approach to simulate it when running on Windows).

Thus, when each worker does:

 df4.loc[date, days] = val1

it's mutating the worker's personal copy of df4; the parent process's copy remains untouched.

In general, there are three ways to handle this:

  1. Change your worker function to return something that can be used in the parent process. For example, instead of trying to perform in-place mutation with df4.loc[date, days] = val1, return what's necessary to do it in the parent, e.g. return date, days, val1, then change the parent to:

    for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()):
        df4.loc[date, days] = val
    

    Downside to this approach is that it requires each of the return values to be pickled (Python's version of serialization), piped from child to parent, and unpickled; if the worker task doesn't do very much work, especially if the return values are large (and in this case, that seems to be the case), it can easily spend more time on serialization and IPC than it gains in parallelism.

  2. Using shared object/memory (demonstrated in this answer to "Multiprocessing writing to pandas dataframe"). In practice, this usually doesn't gain you much, since stuff that isn't based on the more "raw" ctypes sharing using multiprocessing.sharedctypes is still ultimately going to end up needing to pipe data from one process to another; sharedctypes based stuff can get a meaningful speed boost though, since once mapped, shared raw C arrays are nearly as fast to access as local memory.

  3. If the work being parallelized is I/O bound, or uses third party C extensions for CPU bound work (e.g. numpy), you may be able to get the required speed boosts from threads, despite GIL interference, and threads do share the same memory. Your case doesn't appear to be either I/O bound or meaningfully dependent on third party C extensions which might release the GIL, so it probably won't help here, but in general, the simple way to switch from process-based parallelism to thread-based parallelism (when you're already using multiprocessing) is to change the import from:

    import multiprocessing
    

    to

    import multiprocessing.dummy as multiprocessing
    

    which imports the thread-backed version of multiprocessing under the expected name, so code seamlessly switches from using processes to threads.

Upvotes: 1

user3889486
user3889486

Reputation: 656

As RafaelC hinted, It was an XY problem. I've been able to reduce the execution time to 20 seconds without multiprocessing.

I created a lista list that replaces the dictionary, and, rather than adding to the df4 DataFrame a row for each date, once the lista is full, I transform the lista into a DataFrame.

# Returns the largest day from  all the dates (each date has a different number of days)
def longest_series(dic):
    largest_series = 0
    for date in dic.keys():
        # get the last day's table of a specific date
        current_series = dic[date][-1][0]
        if largest_series < current_series:
            largest_series = current_series
    return largest_series


ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)

# creating lista list, lista is similar to dic 
#The difference is that, in lista, every date has the same number of days 
#i.e. from 1 to ls, and it does not contain the dates.

# It takes 15 seconds
lista = list()
for date in dic.keys():
    present_days = list()
    presen_values = list()
    for day, val_252, _ in dic[date]:
        present_days.append(day)
        presen_values.append(val_252)

    missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
    missing_values = [None] * len(missing_days)                     # extra values added to date
    all_days_index = list(np.argsort(present_days + missing_days))  # we need to preserve the order between days and values
    all_day_values = presen_values + missing_values  
    lista.append(list(np.array(all_day_values)[all_days_index]))


# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)

Upvotes: 0

Related Questions