catethos
catethos

Reputation: 123

Writing to redis from dask workers

Suppose I have a function that does some processing and store the result to a redis server

r = redis.StrictRedis()

def process(data):
    (...do some work...)
    r.put(...)

Now I have a large set of data, and I want to use dask to parallelize the process. Something similar to

from dask.distributed imoprt Client
client = Client()
for x in data:
    client.submit(process,x)

But I am getting KeyError(<function process>). Any idea?

EDIT

It works according to @mrocklin answer below to place the connection initialization inside the function. I assume that the connection will be destroyed and recreated as workers come and go. Would it be more efficient if I rewrite my function to accept a batch of data.

def process(batches_data):
    r = redis.StrictRedis()
    for batch in batches_data:
        (...do some work...)
        r.put(...)

Upvotes: 3

Views: 984

Answers (1)

MRocklin
MRocklin

Reputation: 57311

My first guess is that your object r doesn't serialize well. This is fairly typical as objects with live connections often refuse to be serialized (with good reason).

Instead, you might try establishing the connection within the function

def process(data):
    r = redis.StrictRedis()
    ... do some work
    r.put(...)

Additionally, I recommend that you hold onto the futures produced by submit. Otherwise Dask will assume that you no longer care about these tasks and decide that it can ignore them

futures = [client.submit(process, x) for x in L]
wait(futures)

If this doesn't resolve your problem then I recommend editing your original question with a more complete exception and traceback.

Upvotes: 4

Related Questions