Reputation: 2097
There are many simple tutorials and also SO questions and answers out there which claim that Ray somehow shares data with the workers, but none of these go into the exact details of what gets shared how on which OS.
For example in this SO answer: https://stackoverflow.com/a/56287012/1382437 an np array gets serialised into the shared object store and then used by several workers all accessing the same data (code copied from that answer):
import numpy as np
import ray
ray.init()
@ray.remote
def worker_func(data, i):
# Do work. This function will have read-only access to
# the data array.
return 0
data = np.zeros(10**7)
# Store the large array in shared memory once so that it can be accessed
# by the worker tasks without creating copies.
data_id = ray.put(data)
# Run worker_func 10 times in parallel. This will not create any copies
# of the array. The tasks will run in separate processes.
result_ids = []
for i in range(10):
result_ids.append(worker_func.remote(data_id, i))
# Get the results.
results = ray.get(result_ids)
The ray.put(data)
call puts the serialised representation of the data into the shared object store and passes back a handle/id for it.
then when worker_func.remote(data_id, i)
is invoked, the worker_func
gets passed the deserialised data.
But what exactly happens in between? Clearly the data_id
is used to locate the serialised version of data and deserialise it.
Q1: When the data gets "deserialised" does this always create a copy of the original data? I would think yes, but I am not sure.
Once the data has been deserialised, it gets passed to a worker. Now, if the same data needs to get passed to another worker, there are two possibilities:
Q2: When an object that has already been deserialised gets passed to a worker, will it be via another copy or that exact same object? If it is the exact same object, is this using the standard shared memory approach to share data between processes? On Linux this would mean copy-on-write, so does this mean that as soon as the object is written to, another copy of it is created?
Q3: Some tutorials/answers seem to indicate that the overhead of deserialising and sharing data between workers is very different depending on the type of data (Numpy versus non-Numpy) so what are the details there? Why is numpy data shared more efficiently and is this still efficient when the client tries to write to that numpy array (which I think would always create a local copy for the process?) ?
Upvotes: 30
Views: 4768
Reputation: 11041
This is a great question, and one of the cool features that Ray has. Ray provides a way to schedule functions in a distributed environment, but it also provides a cluster store that manages data sharing between these tasks.
Here are the kind of objects that ray
ray.put
function.remote
For all of these alternatives, the objects are managed by the Ray Object Store - also known as Plasma in some documents (see Memory Management in Ray Docs, and Object Management in the Ray Architecture Whitepaper).
Given a Ray cluster with multiple nodes, and having each node running multiple processes, Ray may store objects in any of these locations:
For example, when you call a function remotely in Ray, Ray needs to manage the result from that function. There are two alternatives:
In general, Ray aims to make these details transparent to the user. As long as you're using the appropriate Ray APIs, Ray will behave as expected, and take care of managing all objects stored in the cluster's object store.
Now onto your questions:
Q1: When does the data get serialized/deserialised?
Q2: When an object that has already been deserialised gets passed to a worker, will it be via another copy or that exact same object?
Objects in the Ray Object Store are immutable (except for Actors, which are a special kind of object). When Ray shares an object with another worker, it does it because it knows the object will not change (actors, on the other hand, are always held in a single worker, and cannot be copied to multiple workers).
In short: You can't modify the objects in the Ray Object Store. If you want an updated version of an object, you'll need to create a new object.
Q3: Some tutorials/answers seem to indicate that the overhead of deserialising and sharing data between workers is very different depending on the type of data (Numpy versus non-Numpy) so what are the details there?
Some data is designed to have very similar representation in-memory as in serialized format. For example, Arrow objects only need to be 'cast' into a byte stream, and shared without performing any special computation. Numpy data is also laid out in memory as a C array that can simply be 'cast' to a byte buffer (on the other hand, Python lists are an array of references, where you need to serialize the object of each reference)
Other kinds of data require more computation to be serialized. For example, if you need to serialize a Python function along with its closure, then it may be very slow. Consider the function below: To serialize it you will need to serialize the function, but also all of the variables that it accesses from its enclosing context (e.g. MAX_ELEMENTS
).
MAX_ELEMENTS = 10
def batch_elements(input):
arr = []
for elm in input:
arr.append(elm)
if len(arr) > MAX_ELEMENTS:
yield arr
arr = []
if arr:
yield arr
I hope that helps - I'm happy to dive further into this.
Upvotes: 6
Reputation: 355
Ray is running a redis server internally to share data cross processes.
If you want to know more about it, redis is openning a port in localhost to get/put data, communicating with multiple processes. All data has to be "string" or "list of string", basically. So ray also implement the serialization/deserialization from and to redis.
Upvotes: 1