Louis-Amand
Louis-Amand

Reputation: 198

Ray spilling objects seem to be accumulating

I am using Ray in order to parallelize some computations, but it seems to be accumulating spillage..

I don't mind it spilling objects to my hard drive, but I do if it means using +130 GiB for processing about 1.6 GiB of simulations..

Bellow is a trace of what is happening:

Number of steps: 55 (9,091 simulations each)
0%
[2m[36m(raylet)[0m Spilled 3702 MiB, 12 objects, write throughput 661 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[2m[36m(raylet)[0m Spilled 5542 MiB, 17 objects, write throughput 737 MiB/s.
2%
[2m[36m(raylet)[0m Spilled 9883 MiB, 33 objects, write throughput 849 MiB/s.
5%
[2m[36m(raylet)[0m Spilled 16704 MiB, 58 objects, write throughput 997 MiB/s.
13%
[2m[36m(raylet)[0m Spilled 32903 MiB, 124 objects, write throughput 784 MiB/s.
29%
[2m[36m(raylet)[0m Spilled 66027 MiB, 268 objects, write throughput 661 MiB/s.
53%
[2m[36m(raylet)[0m Spilled 131920 MiB, 524 objects, write throughput 461 MiB/s.
60%

And here is the code I am running:

def get_res_parallel(simulations, num_loads=num_cpus):
    load_size = simulations.shape[0] / num_loads
    simulations_per_load = [simulations[round(n * load_size): round((n+1) * load_size)]
                            for n in range(num_loads)]
    # 2D numpy arrays
    results = ray.get([get_res_ray.remote(simulations=simulations)
                       for simulations in simulations_per_load])
    return np.vstack(results)

MAX_RAM = 6 * 2**30  # 6 GiB

def get_expected_res(simulations, MAX_RAM=MAX_RAM):

    expected_result = np.zeros(shape=87_381, dtype=np.float64)
    bytes_per_res = len(expected_result) * (64 // 8)

    num_steps = simulations.shape[0] * bytes_per_res // MAX_RAM + 1
    step_size = simulations.shape[0] / num_steps

    print(f"Number of steps: {num_steps} ({step_size:,.0f} simulations each)")
    for n in range(num_steps):
        print(f"\r{n / num_steps:.0%}", end="")
        step_simulations = simulations[round(n * step_size): round((n+1) * step_size)]
        results = get_res_parallel(simulations=step_simulations)
        expected_result += results.mean(axis=0)
    print(f"\r100%")

    return expected_result / num_steps

Running on a Mac M1 with 16 GiB of RAM, Ray 2.0.0 and Python 3.9.13.

Question
Given my code, is it normal behavior?
What can I do to resolve this problem? Force garbage collection?

Upvotes: 0

Views: 1610

Answers (1)

Stephanie Wang
Stephanie Wang

Reputation: 254

Do you know the expected size of the array returned by get_res_ray?

Ray will spill objects returned by remote tasks as well as objects passed to remote tasks, so in this case there are two possible places that can cause memory pressure:

  1. The ObjectRefs returned by get_res_ray.remote
  2. The simulations passed to get_res_ray.remote. Since these are large, Ray will automatically put these in the local object store to reduce the size of the task definition.

It may be expected to spill if the size of these objects combined is greater than 30% of the RAM on your machine (this is the default size of Ray's object store). It's not suggested to increase the size of the object store, since this can cause memory pressure on the functions instead.

But you can try to either process fewer things in each iteration and/or you can try to release ObjectRefs sooner. In particular, you should try to release the results from the previous iteration as soon as possible, so that Ray can GC the objects for you. You can do this by calling del results once you're done using them.

Here's a full suggestion that will do the same thing by feeding the array results into another task instead of getting them on the driver. This is usually a better approach because it avoids adding memory pressure on the driver and you're less likely to be accidentally pinning results in the driver's memory.

@ray.remote
def mean(*arrays):
    return np.vstack(arrays).mean(axis=0)

def get_res_parallel(simulations, num_loads=num_cpus):
    load_size = simulations.shape[0] / num_loads
    simulations_per_load = [simulations[round(n * load_size): round((n+1) * load_size)]
                            for n in range(num_loads)]
    # 2D numpy arrays
    # Use the * syntax in Python to unpack the ObjectRefs as function arguments.
    result = mean.remote(*[get_res_ray.remote(simulations=simulations)
               for simulations in simulations_per_load])
    # We never have the result arrays stored in driver's memory.
    return ray.get(result)

MAX_RAM = 6 * 2**30  # 6 GiB

def get_expected_res(simulations, MAX_RAM=MAX_RAM):

    expected_result = np.zeros(shape=87_381, dtype=np.float64)
    bytes_per_res = len(expected_result) * (64 // 8)

    num_steps = simulations.shape[0] * bytes_per_res // MAX_RAM + 1
    step_size = simulations.shape[0] / num_steps

    print(f"Number of steps: {num_steps} ({step_size:,.0f} simulations each)")
    for n in range(num_steps):
        print(f"\r{n / num_steps:.0%}", end="")
        step_simulations = simulations[round(n * step_size): round((n+1) * step_size)]
        expected_result += get_res_parallel(simulations=step_simulations)
    print(f"\r100%")

    return expected_result / num_steps

Upvotes: 3

Related Questions