Reputation: 656
I have a dictionary where each key (date) contains a table (multiple lists of the format[day1, val11, val21], [day2, va12, val22], [day3, val13, val23], ...
. I want to transform it into a DataFrame; this is done with the following code:
df4 = pd.DataFrame(columns=sorted(set_days))
for date in dic.keys():
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1
This code works fine, but it takes more than two hours to run.
After some research, I've realized I could parallelize it via the multiprocessing
library; the following code is the intended parallel version
import multiprocessing
def func(date):
global df4, dic
days = [day for day, val1, val2 in dic[date]]
val1 = [val1 for day, val1, val2 in dic[date]]
df4.loc[date, days] = val1
multiprocessing.Pool(processes=8).map(func, dic.keys())
The problem with this code is that, after executing multiprocessing.Pool(processes...
, the df4
DataFrame is empty.
Any help would be much appreciated.
Example
Suppose the dictionary contains two days:
dic['20030812'][:4]
Out: [[1, 24.25, 0.0], [20, 23.54, 23.54], [30, 23.13, 24.36], [50, 22.85, 23.57]]
dic['20030813'][:4]
Out: [[1, 24.23, 0.0], [19, 23.4, 22.82], [30, 22.97, 24.19], [49, 22.74, 23.25]]
then the DataFrame should be of the form:
df4.loc[:, 1:50]
1 2 3 4 5 ... 46 47 48 49 50
20030812 24.25 NaN NaN NaN NaN ... NaN NaN NaN NaN 22.85
20030813 24.23 NaN NaN NaN NaN ... NaN NaN NaN 22.74 NaN
Also,
dic.keys()
Out[36]: dict_keys(['20030812', '20030813'])
df1.head().to_dict()
Out:
{1: {'20030812': 24.25, '20030813': 24.23},
2: {'20030812': nan, '20030813': nan},
3: {'20030812': nan, '20030813': nan},
4: {'20030812': nan, '20030813': nan},
5: {'20030812': nan, '20030813': nan},
6: {'20030812': nan, '20030813': nan},
7: {'20030812': nan, '20030813': nan},
8: {'20030812': nan, '20030813': nan},
9: {'20030812': nan, '20030813': nan},
10: {'20030812': nan, '20030813': nan},
11: {'20030812': nan, '20030813': nan},
12: {'20030812': nan, '20030813': nan},
13: {'20030812': nan, '20030813': nan},
14: {'20030812': nan, '20030813': nan},
15: {'20030812': nan, '20030813': nan},
16: {'20030812': nan, '20030813': nan},
17: {'20030812': nan, '20030813': nan},
18: {'20030812': nan, '20030813': nan},
19: {'20030812': nan, '20030813': 23.4},
20: {'20030812': 23.54, '20030813': nan},
21: {'20030812': nan, '20030813': nan},
22: {'20030812': nan, '20030813': nan},
23: {'20030812': nan, '20030813': nan},
24: {'20030812': nan, '20030813': nan},
25: {'20030812': nan, '20030813': nan},
26: {'20030812': nan, '20030813': nan},
27: {'20030812': nan, '20030813': nan},
28: {'20030812': nan, '20030813': nan},
29: {'20030812': nan, '20030813': nan},
30: {'20030812': 23.13, '20030813': 22.97},
31: {'20030812': nan, '20030813': nan},
32: {'20030812': nan, '20030813': nan},
...
Upvotes: 3
Views: 243
Reputation: 155323
To answer your original question (roughly: "Why is the df4
DataFrame empty?"), the reason this doesn't work is that when the Pool
workers are launched, each worker inherits a personal copy-on-write view of the parent's data (either directly if multiprocessing
is running on a UNIX-like system with fork
, or via a kludgy approach to simulate it when running on Windows).
Thus, when each worker does:
df4.loc[date, days] = val1
it's mutating the worker's personal copy of df4
; the parent process's copy remains untouched.
In general, there are three ways to handle this:
Change your worker function to return something that can be used in the parent process. For example, instead of trying to perform in-place mutation with df4.loc[date, days] = val1
, return what's necessary to do it in the parent, e.g. return date, days, val1
, then change the parent to:
for date, days, val in multiprocessing.Pool(processes=8).map(func, dic.keys()):
df4.loc[date, days] = val
Downside to this approach is that it requires each of the return values to be pickled (Python's version of serialization), piped from child to parent, and unpickled; if the worker task doesn't do very much work, especially if the return values are large (and in this case, that seems to be the case), it can easily spend more time on serialization and IPC than it gains in parallelism.
Using shared object/memory (demonstrated in this answer to "Multiprocessing writing to pandas dataframe"). In practice, this usually doesn't gain you much, since stuff that isn't based on the more "raw" ctypes
sharing using multiprocessing.sharedctypes
is still ultimately going to end up needing to pipe data from one process to another; sharedctypes
based stuff can get a meaningful speed boost though, since once mapped, shared raw C arrays are nearly as fast to access as local memory.
If the work being parallelized is I/O bound, or uses third party C extensions for CPU bound work (e.g. numpy
), you may be able to get the required speed boosts from threads, despite GIL interference, and threads do share the same memory. Your case doesn't appear to be either I/O bound or meaningfully dependent on third party C extensions which might release the GIL, so it probably won't help here, but in general, the simple way to switch from process-based parallelism to thread-based parallelism (when you're already using multiprocessing
) is to change the import
from:
import multiprocessing
to
import multiprocessing.dummy as multiprocessing
which imports the thread-backed version of multiprocessing
under the expected name, so code seamlessly switches from using processes to threads.
Upvotes: 1
Reputation: 656
As RafaelC hinted, It was an XY problem. I've been able to reduce the execution time to 20 seconds without multiprocessing.
I created a lista list that replaces the dictionary, and, rather than adding to the df4 DataFrame a row for each date, once the lista is full, I transform the lista into a DataFrame.
# Returns the largest day from all the dates (each date has a different number of days)
def longest_series(dic):
largest_series = 0
for date in dic.keys():
# get the last day's table of a specific date
current_series = dic[date][-1][0]
if largest_series < current_series:
largest_series = current_series
return largest_series
ls = longest_series(dic)
l_total_days = list(range(1, ls+1))
s_total_days = set(l_total_days)
# creating lista list, lista is similar to dic
#The difference is that, in lista, every date has the same number of days
#i.e. from 1 to ls, and it does not contain the dates.
# It takes 15 seconds
lista = list()
for date in dic.keys():
present_days = list()
presen_values = list()
for day, val_252, _ in dic[date]:
present_days.append(day)
presen_values.append(val_252)
missing_days = list(s_total_days.difference(set(present_days))) # extra days added to date
missing_values = [None] * len(missing_days) # extra values added to date
all_days_index = list(np.argsort(present_days + missing_days)) # we need to preserve the order between days and values
all_day_values = presen_values + missing_values
lista.append(list(np.array(all_day_values)[all_days_index]))
# It takes 4 seconds
df = pd.DataFrame(lista, index= dic.keys(), columns=l_total_days)
Upvotes: 0