Reputation: 4015
I am trying to pass a big pandas dataframe as a function argument to a worker of dask distributed. What I tried (X is my dataframe):
1 Passing the data directly to function:
def test(X):
return X
f=client.submit(test, X)
f.result()
2 Saving a dataframe in an initialization function.
def worker_init(r_X):
global X
X=r_X
client.run(worker_init,X,y)
3 Scattering the dataframe across all nodes and then using it via futures
def test(X):
return X
f_X = client.scatter(X, broadcast=True)
f = client.submit(test,f_X)
f.result()
None of the variants work for my case. Variant 1 and 2 work almost the same. The dask-scheduler increases the memory with every task and never frees it until it gets out of memory and the task fails.
Variant 3 does not work because instead of passing a pandas dataframe I get some garbage.
How can I send the dataframe to a worker and do not have a MemoryError on the scheduler?
The full code of variant 3 that is supposed to be memory efficient, but does not even pass a dataframe:
import pandas as pd
import numpy as np
from distributed import Client
client = Client('localhost:8786')
X = np.random.rand(10000,100)
X=pd.DataFrame(X)
f_X = client.scatter(X, broadcast=True)
def test(X):
return X
f = client.submit(test,f_X)
f.result()[:10]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Upvotes: 4
Views: 2659
Reputation: 28683
client.scatter
checks for a list of inputs, so when you pass in the dataframe, you are accidentally unpacking it into a list of series. You should do
f_X = client.scatter([X], broadcast=True)
and now you have the one dataframe on every worker. Here f_X is also a list, containing the one future, so you would then want f = client.submit(test,f_X[0])
.
In general, you will be better is you can generate/load your data within functions on the workers rather than passing them from your client, which obviously requires fitting the whole thing into local memory, copying that data, and serialization costs along the way.
Upvotes: 6