Reputation: 123
Suppose I have a function that does some processing and store the result to a redis server
r = redis.StrictRedis()
def process(data):
(...do some work...)
r.put(...)
Now I have a large set of data, and I want to use dask to parallelize the process. Something similar to
from dask.distributed imoprt Client
client = Client()
for x in data:
client.submit(process,x)
But I am getting KeyError(<function process>)
. Any idea?
EDIT
It works according to @mrocklin answer below to place the connection initialization inside the function. I assume that the connection will be destroyed and recreated as workers come and go. Would it be more efficient if I rewrite my function to accept a batch of data.
def process(batches_data):
r = redis.StrictRedis()
for batch in batches_data:
(...do some work...)
r.put(...)
Upvotes: 3
Views: 984
Reputation: 57311
My first guess is that your object r
doesn't serialize well. This is fairly typical as objects with live connections often refuse to be serialized (with good reason).
Instead, you might try establishing the connection within the function
def process(data):
r = redis.StrictRedis()
... do some work
r.put(...)
Additionally, I recommend that you hold onto the futures produced by submit
. Otherwise Dask will assume that you no longer care about these tasks and decide that it can ignore them
futures = [client.submit(process, x) for x in L]
wait(futures)
If this doesn't resolve your problem then I recommend editing your original question with a more complete exception and traceback.
Upvotes: 4