Reputation: 51
I'm using dask/distributed to submit 100+ evaluations of a function to the multi-node cluster. Each eval is very costly, about 90 sec of CPU time. I've noticed though that there seems to be a memory leak and all workers over time grow in size, although the function i'm evaluating is not pure. Here's sample code to reproduce this behavior:
import numpy as np
from dask.distributed import Client
class Foo:
def __init__(self):
self.a = np.random.rand(2000, 2000) # dummy data, not really used
@staticmethod
def myfun1(k):
return np.random.rand(10000 + k, 100)
def myfun2(self, k):
return np.random.rand(10000 + k, 100)
client = Client('XXX-YYY:8786')
f = Foo()
tasks = client.map(f.myfun2, range(100), pure=False)
results = client.gather(tasks)
tasks = []
If client.map() is called to execute f.myfun1() (which is just a static method), the workers don't grow in size. However, if one calls f.myfun2() workers size grows considerably (eg. 50mb -> 400mb) after just one client.map() call above. Also client.close() does nothing to reduce workers size.
Is this a memory leak or I'm not using dask.distributed correctly? I definitely don't care about results of my calculations being available afterwards or shared on the cluster. FWIW, tested with distributed v1.19.1 and Python 3.5.4
Upvotes: 5
Views: 975
Reputation: 57271
Nice example.
Your myfun2
method is attached to your f = Foo()
object, which carries around with it a decently large attribute (f.a)
. This f.myfun2
method is thus actually really expensive to move around, and you're creating 1000 of them. If you can it's best to avoid using methods of large objects in a distributed setting. Instead consider using functions.
Upvotes: 1