Reputation: 2285
I have implemented a data iterator that takes objects from two numpy
arrays and does very intensive CPU computations on them before returning them. I want to parallelize this using Dask
. Here is a simple version of this iterator class:
import numpy as np
class DataIterator:
def __init__(self, x, y):
self.x = x
self.y = y
def __len__(self):
return len(self.x)
def __getitem__(self, idx):
item1, item2 = x[idx], y[idx]
# Do some very heavy computations here by
# calling other methods and return
return item1, item2
x = np.random.randint(20, size=(20,))
y = np.random.randint(50, size=(20,))
data_gen = DataIterator(x, y)
Right now, I iterate over the items using a simple for loop like this:
for i, (item1, item2) in enumerate(data_gen):
print(item1, item2)
But this is really really slow. Can someone please help in parallelizing it using dask?
Upvotes: 1
Views: 611
Reputation: 4004
The simplest way to achieve this is to use dask.delayed and to decorate the getitem method. The alternative is to turn x, y into dask arrays and then do the computation in the getitem method using dask.array commands. Since you have not provided details of the heavy computation, the examples below are only for guidance.
Dask.delayed:
from dask import delayed
import numpy as np
class DataIterator:
def __init__(self, x, y):
self.x = x
self.y = y
def __len__(self):
return len(self.x)
@delayed
def __getitem__(self):
item1 = x.mean()
item2 = y.sum()
# Do some very heavy computations here by
# calling other methods and return
return item1, item2
x = np.random.randint(20, size=(20,))
y = np.random.randint(50, size=(20,))
data_gen = DataIterator(x, y)
x_mean, y_sum = data_gen.__getitem__().compute()
Output:
x_mean, y_sum
Out[41]: (8.45, 479)
Dask.array:
import dask.array as da
import numpy as np
class DataIterator:
def __init__(self, x, y):
self.x = x
self.y = y
def __len__(self):
return len(self.x)
def __getitem__(self):
item1 = x.mean()
item2 = y.sum()
# Do some very heavy computations here by
# calling other methods and return
return item1.compute(), item2.compute()
x = np.random.randint(20, size=(20,))
y = np.random.randint(50, size=(20,))
x = da.from_array(x, chunks = x.shape[0] // 4)
y = da.from_array(y, chunks = y.shape[0] // 4)
data_gen = DataIterator(x, y)
x_mean, y_sum = data_gen.__getitem__()
Ouput:
x_mean, y_sum
Out[50]: (10.4, 461)
Upvotes: 2