Reputation: 35
I have a large array arr1
of shape (k, n)
where both k
and n
are of the order 1e7. Each row contains only a few hundred non-zero elements and is sparse.
For each row k
I need to do element-wise multiplication with arr2
of shape (1, n)
.
Currently I perform this multiplication using the multiply
method of scipy.sparse.csc_matrix
and the multiplication is performed as part of a function that I am minimising which means it is evaluated thousands of times and causes a large computational load. More importantly, I've found that this function runs on a single core.
I've instead tried to find ways of parallelising this calculation by splitting the array into sub-arrays in k
to calculate in parallel. Much to my dismay I find that the parallelised version runs even slower. So far I've tried implementations in Dask, Ray, and multiprocessing. Below are the implementations I've been using on a machine with ~500GB RAM and 56 CPUs.
I don't understand why the paralell version run so slowly. This is my first time parallelising my own code so any assistance is greatly appreciated.
import scipy.sparse as scisp
import numpy as np
import dask.array as da
import dask
import multiprocessing as mp
import ray
import psutil
rng = np.random.default_rng()
rows = np.zeros((5600, 1_000_000))
rows[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
scisp_arr1 = scisp.coo_matrix(rows)
scisp_arr1 = scisp.csc_matrix(scisp_arr1)
arr2 = rng.uniform(size=(1, 1_000_000))
arr2 = scisp.csc_matrix(arr2)
arr1 = None
for i in range(1000):
big_box = scisp.vstack((arr1, scisp_arr))
arr1 = scisp.csc_matrix(arr1)
%%time arr1.multiply(arr2).sum()
CPU times: user 4.92 s, sys: 2.72 s, total: 7.64 s
Wall time: 7.64 s
%%time
def f(arr1, arr2):
return arr1.multiply(arr2)
delayed_multiply = dask.delayed(f)
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(delayed_multiply(arr1[i:i+steps], arr2).sum())
total = dask.delayed(sum)(total)
total.compute()
CPU times: user 1min 13s, sys: 49 s, total: 2min 2s
Wall time: 55.5 s
ray.init(num_cpus=psutil.cpu_count())
%%time
@ray.remote
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(f.remote(arr1[i:i+steps], arr2))
sum(ray.get(total))
CPU times: user 52.4 s, sys: 9.39 s, total: 1min 1s
Wall time: 59.4 s
%%time
steps = arr1.shape[0]//56
chunks = [(arr1[i:i+steps], arr2) for i in range(0, arr1.shape[0], steps)]
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
def main(args):
steps = arr1.shape[0]//56
pool = mp.Pool(mp.cpu_count())
result = pool.starmap(f, args)
return result
sum(main(chunks))
CPU times: user 49.8 s, sys: 41.9 s, total: 1min 31s
Wall time: 1min 39s
Following the @michael-delgado's suggestion I've made the following updated attempt using Dask:
def foo(arr):
arr[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
return arr
chunk_len = 0.8*psutil.virtual_memory().available // 1e6 // psutil.cpu_count() // 8
arr1 = da.zeros((5_600_000, 1_000_000), chunks=(chunk_len, 1_000_000))
arr1 = foo(arr1)
arr1 = arr1.map_blocks(sparse.COO)
result = arr1.compute()
arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
---
%%time
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K.compute()).sum(axis=0)
CPU times: user 2min 5s, sys: 51 s, total: 2min 56s
Wall time: 5.71 s
The same operation using Scipy.sparse on a single core gives me:
arr1_scipy = result.tocsc()
---
%%time
arr2 = scisp.csc_matrix(rng.uniform(size=(1, 1_000_000)))
K = arr1_scipy.multiply(arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0)
CPU times: user 4.88 s, sys: 1.65 s, total: 6.53 s
Wall time: 6.53 s
I am surprised that there isn't a larger improvement. Is this simply due to overhead from parallelisation? Could the Dask implementation be improved further?
Upvotes: 2
Views: 364
Reputation: 16551
In the updated code snippet inside the question, there is a .compute
step, which reduces the benefit of using dask:
# deleting/commenting out these two lines will keep arr1 as a dask array
#result = arr1.compute()
#arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
# the downstream code doesn't need to change except for putting .compute
# at the very end, rather than inside np.log (it's a very minor impact)
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0).compute()
If you are inside a notebook, re-running the last snippet a second time will show the speed improvement.
Upvotes: 2
Reputation: 15432
If I'm understanding your implementations correctly, you haven't actually partitioned the arrays in any of these cases. so all you've done is run the exact same workflow, but on a different thread, so the "parallel" execution time is the original runtime plus the overhead of setting up the distributed job scheduler and passing everything to the second thread.
If you want to see any total time improvements, you'll have to actually rewrite your code to operate on subsets of the data.
In the dask case, use dask.array.from_numpy
to split the array into multiple chunks, then rewrite your workflow to use dask.array operations rather than numpy ones. Alternatively, partition the data yourself and run your function on subsets of the array using dask distributed's client.map
(see the quickstart).
None of these approaches will be easy, and you need to recognize that there is an overhead (both in terms of actual compute/network usagee/memory etc as well as a real investment of your time) in any of these, but if total runtime is important then it'll be worth it. See the dask best practices documentation for more background.
Update:
After your iteration with dask.array, your implementation is now faster than the single-threaded wall time, and yes, the additional CPU time is overhead. For your first time trying this, getting it to be faster than numpy/scipy (which, by the way, are already heavily optimized and are likely parallelizing under the hood in the single-threaded approach) is a huge win, so pat yourself on the back. Getting this to be even faster is a legitimate challenge that is far outside the scope of this question. Welcome to parallelism!
Additional reading:
Upvotes: 2