Aman J
Aman J

Reputation: 1855

Parallelize nested large `(15e4 * 15e4)` for loop to get a pairwise matrix

I am trying to parallelize the following code, which creates a pairwise result for each row. As shown below.

def get_custom_value(i, j):
    first = df[df['id'] == i]
    second = df[df['id'] == j]

    return int(first['val_1']) * int(second['val_1']) +\
            int(first['val_2']) * int(second['val_2'])

df = pd.DataFrame(
    {
        'id' : range(4),
        'val_1' : [3, 4, 5, 1],
        'val_2' : [2, 3, 1, 1]
    }
)

n = df.shape[0]

result = []

for i in range(n):
    for j in range(i+1, n):
        temp_value = get_custom_value(i, j)
        result.append([i, j, temp_value])
        if len(result) > 1e5:
            # store it in a local file and reset the result object.
            # Assume here some code to write to a local file here.
            result = []

print(result)

What I have tried? Below is the code: The code hangs. Without any error.

import itertools
import multiprocessing

paramlist = list(itertools.combinations(df.id, 2))
pool = multiprocessing.Pool(processes = 2)
result  = pool.map(get_custom_value, paramlist)
print(result)

Can I use dask for this?

The actual data has more than 150,000 records. i.e final result will have (150,000 * 150,000 * 1/2) pairs/rows. Given the huge size of the result object, I have a condition which if satisfied then the result is stored. Hence, the actual result object will not exceed my RAM.

Upvotes: 1

Views: 253

Answers (3)

David Gruzman
David Gruzman

Reputation: 8088

Lets reframe problem as building and processing cartesian product of two datasets (or dataset with itself). In distributed setting it is usually solved as following: Lets call our datasets A and B

  • We partition bigger dataset over cluster
  • We broadcast (replicate) smaller dataset to each machine in the cluster
  • We calculate and process caretesian product in each machine. If smaller dataset is not "that small" and we can not broadcast it, we can do work in iterations, each time broadcasting another partition of it.

In our case, we have one dataset, and it is small. So what should be done is:

  • we should partition our dataframe over cluster, by just creating dask dataframe with enough partitions. Lets call it "A_Partitioned"
  • We should broadcast the same dataset over cluster using scatter function over the cluster http://distributed.dask.org/en/stable/locality.html Lets call it A_Broadcasted
  • Now we can do map_partiions over A_Partitioned where it will do nested loops over partition and A_Broadcasted

Upvotes: 0

FirefoxMetzger
FirefoxMetzger

Reputation: 3260

Yes, you can use dask for this. However, I would recommend adding a reduction to the computation that directly computes the quantity you are interest in (summary statistics [mean, median, ...], max/min over all values, etc.). Otherwise, you are looking to write something in the ballpark of 250 GB of binary data, which isn't the most efficient use of computing time and disk space (unless your use-case demands it).

import pandas as pd
import numpy as np
import dask.array as da

# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
    {
        # no need for an ID column, because you have a range index by default
        "val_1": np.random.randint(0, 500, size=num_records),
        "val_2": np.random.randint(0, 500, size=num_records),
    }
)

# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2 + 1).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)


val_1 = df.loc[:, "val_1"].to_numpy().astype(int)
val_2 = df.loc[:, "val_2"].to_numpy().astype(int)
custom_value_vector = da.take(val_1, idx_i) * da.take(val_1, idx_j) + da.take(
    val_2, idx_i
) * da.take(val_2, idx_j)
result_vector = da.stack([idx_i, idx_j, custom_value_vector], axis=-1)
rechunked_result = da.rechunk(result_vector, (5e7, 3))

# now you can reduce to your desired summary statistic
# saving uses da.to_npy_stack
reduced = da.min(rechunked_result)

The above will run in a few sec, but that is just setting up the computational graph. You will still have to run it

from dask.diagnostics import ProgressBar

with ProgressBar():
    value = reduced.compute()
# [########################################] | 100% Completed |  3min  3.8s

If you can't vectorize, you can still use dask to do your computation. However, be aware that things will be significantly slower:

import pandas as pd
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar


# generate fake data of desired size
num_records = int(150000)
df = pd.DataFrame(
    {
        "id": np.arange(num_records),
        "val_1": np.random.randint(0, 500, size=num_records),
        "val_2": np.random.randint(0, 500, size=num_records),
    }
)

# index magix to walk over the upper triangle of a fortran contigous array without diagonal
# Note: this is a different order than you use in your for loops, but
# order doesn't matter here.
N = len(df) - 1
flat_index = da.arange(N * (N + 1) // 2, chunks=(1e6,))
idx_j = (da.floor((da.sqrt(1 + 8 * flat_index) - 1)) // 2).astype(int)
idx_i = flat_index - (idx_j * (idx_j + 1) // 2)
idx_j = idx_j + 1

def get_custom_value(multi_idx):
    i, j = multi_idx  # I added this
    first = df[df['id'] == i]
    second = df[df['id'] == j]

    return int(first['val_1']) * int(second['val_1']) +\
            int(first['val_2']) * int(second['val_2'])

multi_idx = da.stack((idx_i, idx_j), axis=-1)
custom_value = da.apply_along_axis(get_custom_value, -1, multi_idx, dtype=int, shape=tuple())
result = da.stack([idx_i, idx_j, custom_value])
reduced = da.min(result)


with ProgressBar():
    value = reduced.compute(scheduler="processes")

I didn't run this to completion, so I can't share any timings. (I stopped after around 10 minutes. The point here is: it works, but you will need to bring a healthy amount of patience.)

Upvotes: 2

Jérôme Richard
Jérôme Richard

Reputation: 50278

The algorithm used is very inefficient. Indeed, both df['id'] == i and df['id'] == j iterate over the whole id column containing 150_000 items in your real-world use-case. Thus, your algorithm runs in O(n^3) time and performs roughly 3_375_000_000_000_000 comparisons while the best algorithm runs in O(n^2) time.

Moreover, CPython loops are very slow and you should avoid using them as much as possible. Fetching Pandas dataframe cells by name is very slow too. Instead, you can use vectorized Pandas/Numpy functions.

Additionally, the output is not efficient too: CPython lists are a bit slow (because of dynamic reference-counted objects) and storing the (i,j) values consume three time more memory. You can store the result in a matrix. Possibly a sparse one or alternatively in a list of compact Numpy arrays.

Furthermore, Bigger data structures are generally slower. If you want a computation to be done very quickly, you generally need to make it fit in the CPU caches (of few MiB). Thus, process you dataframe efficiently you certainly need to compute it in-situ.

Here is a relatively efficient solution using Numpy:

import numpy as np
val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())
result = val_1.reshape(-1, 1) * val_1 + val_2.reshape(-1, 1) * val_2

It produces a matrix where the (i,j) item can be found using result[i, j]. reshape(-1, 1) is used to transpose the horizontal vector so to get a vertical one and then benefit from Numpy broadcasting. Note that you can filter the upper-triangular part using np.triu(result, 1).

You can generate the result line by line so not to allocate a huge array:

val_1 = np.ascontiguousarray(df['val_1'].to_numpy())
val_2 = np.ascontiguousarray(df['val_2'].to_numpy())

for i in range(n-1):
    first_val_1 = val_1[i]
    first_val_2 = val_2[i]
    line = first_val_1 * val_1[i+1:] + first_val_2 * val_2[i+1:]

    # Store the line if needed with the i value so to know where it is

If you really want to generate an inefficient list from the Numpy array lines, then you can do that with np.vstack((np.repeat(i, n-i-1), np.arange(i+1, n), line)).T.tolist(). But I strongly advise you not to do that (there is certainly no need to use lists). Note that you can load/store Numpy arrays efficiently using np.load and np.save.

Here are the performances of the different approaches on my machine (with a i5-9600KF processor, 2 DDR4 channels reaching 40 GiB/s and a fast Nvme SSD that can practically write big files at 800 MiB/s) on a random Pandas dataframe with 15_000 records:

Initial code:                 60500    seconds  (estimation)
Numpy matrix:                     0.71 second
Numpy line-by-line:               0.24 second

Time to store all the lines:      0.50 second   (estimation)
in a compact way on my SSD

Thus, the Numpy, line-by-line solution is about 250_000 times faster than the initial code! All of this without using multiple cores. In fact, using multiple cores will not be much faster in this case because the RAM is a limited shared resource and file storages are not much faster in parallel on most machines (in fact, HDD are slower when used in parallel because they are inherently sequential). If you really want to do that, then using multiprocessing is definitively not the good tool. Please consider using Numba or Cython instead.

Upvotes: 5

Related Questions