keiv.fly
keiv.fly

Reputation: 4015

How to pass a pandas dataframe to dask distributed workers?

I am trying to pass a big pandas dataframe as a function argument to a worker of dask distributed. What I tried (X is my dataframe):

1 Passing the data directly to function:

def test(X):
    return X
f=client.submit(test, X)
f.result()

2 Saving a dataframe in an initialization function.

def worker_init(r_X):
    global X
    X=r_X
client.run(worker_init,X,y)

3 Scattering the dataframe across all nodes and then using it via futures

def test(X):
    return X
f_X = client.scatter(X, broadcast=True)
f = client.submit(test,f_X)
f.result()

None of the variants work for my case. Variant 1 and 2 work almost the same. The dask-scheduler increases the memory with every task and never frees it until it gets out of memory and the task fails.

Variant 3 does not work because instead of passing a pandas dataframe I get some garbage.

How can I send the dataframe to a worker and do not have a MemoryError on the scheduler?

The full code of variant 3 that is supposed to be memory efficient, but does not even pass a dataframe:

import pandas as pd
import numpy as np
from distributed import Client
client = Client('localhost:8786')
X = np.random.rand(10000,100)
X=pd.DataFrame(X)
f_X = client.scatter(X, broadcast=True)
def test(X):
    return X
f = client.submit(test,f_X)
f.result()[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Upvotes: 4

Views: 2659

Answers (1)

mdurant
mdurant

Reputation: 28683

client.scatter checks for a list of inputs, so when you pass in the dataframe, you are accidentally unpacking it into a list of series. You should do f_X = client.scatter([X], broadcast=True)

and now you have the one dataframe on every worker. Here f_X is also a list, containing the one future, so you would then want f = client.submit(test,f_X[0]).

In general, you will be better is you can generate/load your data within functions on the workers rather than passing them from your client, which obviously requires fitting the whole thing into local memory, copying that data, and serialization costs along the way.

Upvotes: 6

Related Questions