jpp1
jpp1

Reputation: 2097

How exactly does Ray share data to workers?

There are many simple tutorials and also SO questions and answers out there which claim that Ray somehow shares data with the workers, but none of these go into the exact details of what gets shared how on which OS.

For example in this SO answer: https://stackoverflow.com/a/56287012/1382437 an np array gets serialised into the shared object store and then used by several workers all accessing the same data (code copied from that answer):

import numpy as np
import ray

ray.init()

@ray.remote
def worker_func(data, i):
    # Do work. This function will have read-only access to
    # the data array.
    return 0

data = np.zeros(10**7)
# Store the large array in shared memory once so that it can be accessed
# by the worker tasks without creating copies.
data_id = ray.put(data)

# Run worker_func 10 times in parallel. This will not create any copies
# of the array. The tasks will run in separate processes.
result_ids = []
for i in range(10):
    result_ids.append(worker_func.remote(data_id, i))

# Get the results.
results = ray.get(result_ids)

The ray.put(data) call puts the serialised representation of the data into the shared object store and passes back a handle/id for it.

then when worker_func.remote(data_id, i) is invoked, the worker_func gets passed the deserialised data.

But what exactly happens in between? Clearly the data_id is used to locate the serialised version of data and deserialise it.

Q1: When the data gets "deserialised" does this always create a copy of the original data? I would think yes, but I am not sure.

Once the data has been deserialised, it gets passed to a worker. Now, if the same data needs to get passed to another worker, there are two possibilities:

Q2: When an object that has already been deserialised gets passed to a worker, will it be via another copy or that exact same object? If it is the exact same object, is this using the standard shared memory approach to share data between processes? On Linux this would mean copy-on-write, so does this mean that as soon as the object is written to, another copy of it is created?

Q3: Some tutorials/answers seem to indicate that the overhead of deserialising and sharing data between workers is very different depending on the type of data (Numpy versus non-Numpy) so what are the details there? Why is numpy data shared more efficiently and is this still efficient when the client tries to write to that numpy array (which I think would always create a local copy for the process?) ?

Upvotes: 30

Views: 4768

Answers (2)

Pablo
Pablo

Reputation: 11041

This is a great question, and one of the cool features that Ray has. Ray provides a way to schedule functions in a distributed environment, but it also provides a cluster store that manages data sharing between these tasks.

Here are the kind of objects that ray

  • Objects added with ray.put
  • A result from function.remote
  • A Ray actor (the instantiation of a remote class in a Ray cluster)

For all of these alternatives, the objects are managed by the Ray Object Store - also known as Plasma in some documents (see Memory Management in Ray Docs, and Object Management in the Ray Architecture Whitepaper).

Given a Ray cluster with multiple nodes, and having each node running multiple processes, Ray may store objects in any of these locations:

  • The local memory space for the running process
  • The shared memory space for all processes in a single node
  • (Only if necessary to reclaim memory) Persistent storage / hard drive

For example, when you call a function remotely in Ray, Ray needs to manage the result from that function. There are two alternatives:

  • If the serialized result is small, then Ray will send it back directly to the caller, and it will be stored in the local memory space of the caller. (see left side of the picture below, where the result is stored in the owner process)
  • If the serialized result is large, then Ray will store it in the shared memory of the node executing the function. (see right side of the picture below, where the result is stored in the shared-memory object store in the local node).

ray example

In general, Ray aims to make these details transparent to the user. As long as you're using the appropriate Ray APIs, Ray will behave as expected, and take care of managing all objects stored in the cluster's object store.


Now onto your questions:

Q1: When does the data get serialized/deserialised?

  • It all depends on whether the data has to be transferred over the network or not. If the data does not need to travel over the network, or be spilled to disk, Ray will try to avoid serializing/deserializing it, because there's a cost to doing that. For example, an object in shared memory does not need to be serialized/deserialized, because it can be directly dereferenced by the processes with access to that memory.

Q2: When an object that has already been deserialised gets passed to a worker, will it be via another copy or that exact same object?

  • Objects in the Ray Object Store are immutable (except for Actors, which are a special kind of object). When Ray shares an object with another worker, it does it because it knows the object will not change (actors, on the other hand, are always held in a single worker, and cannot be copied to multiple workers).

  • In short: You can't modify the objects in the Ray Object Store. If you want an updated version of an object, you'll need to create a new object.

Q3: Some tutorials/answers seem to indicate that the overhead of deserialising and sharing data between workers is very different depending on the type of data (Numpy versus non-Numpy) so what are the details there?

  • Some data is designed to have very similar representation in-memory as in serialized format. For example, Arrow objects only need to be 'cast' into a byte stream, and shared without performing any special computation. Numpy data is also laid out in memory as a C array that can simply be 'cast' to a byte buffer (on the other hand, Python lists are an array of references, where you need to serialize the object of each reference)

  • Other kinds of data require more computation to be serialized. For example, if you need to serialize a Python function along with its closure, then it may be very slow. Consider the function below: To serialize it you will need to serialize the function, but also all of the variables that it accesses from its enclosing context (e.g. MAX_ELEMENTS).

MAX_ELEMENTS = 10
def batch_elements(input):
  arr = []
  for elm in input:
    arr.append(elm)
    if len(arr) > MAX_ELEMENTS:
      yield arr
      arr = []

  if arr:
    yield arr

I hope that helps - I'm happy to dive further into this.

Upvotes: 6

Ben L
Ben L

Reputation: 355

Ray is running a redis server internally to share data cross processes.

If you want to know more about it, redis is openning a port in localhost to get/put data, communicating with multiple processes. All data has to be "string" or "list of string", basically. So ray also implement the serialization/deserialization from and to redis.

Upvotes: 1

Related Questions