Reputation: 2766
I'm trying to write a class to help with buffering some data that takes a while to read in, and which needs to be periodically updated. The python version is 3.7. There are 3 criteria I would like the class to satisfy:
I've tried having instances create their own subprocess for running the updates. This causes problems because simply passing the instance to another process seems to create a copy, so the desired instance is not updated automatically.
Below is an example of the approach I'm trying. Can anyone help getting the automatic update to work?
import multiprocessing as mp
import random
import time
def refresh_helper(buffer, lock):
"""Periodically calls refresh method in a buffer instance."""
while True:
with lock.acquire():
buffer._refresh_data()
time.sleep(10)
class Buffer:
def __init__(self):
# Set up a helper process to periodically update data
self.lock = mp.Lock()
self.proc = mp.Process(target=refresh_helper, args=(self, self.lock), daemon=True)
self.proc.start()
# Do an initial update
self.data = None
self.update()
def _refresh_data(self):
"""Pretends to read in some data. This would take a while for real data"""
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
data = [random.choice(numbers) for _ in range(3)]
self.data = data
def update(self):
with self.lock.acquire():
self._refresh_data()
def get_data(self):
return self.data
#
if __name__ == '__main__':
buffer = Buffer()
data_first = buffer.get_data()
time.sleep(11)
data_second = buffer.get_data() # should be different from first
Upvotes: 1
Views: 183
Reputation:
All processes exist in different areas of memory from one another, each of which is meant to be fully separate from all others. As you pointed out, the additional process creates a copy of the instance on which it operates, meaning the updated version exists in a separate memory space from the instance you're running get_data()
on. Because of this there is no easy way to perform this operation on this specific instance from a different process.
Given that you want the updating of the data to not block the checking of the data, you may not use threading, as only 1 thread may operate at a time in any given process. Instead, you need to use an object which exists in a memory space shared between all processes. To do this, you can use a multiprocessing.Value object or a multiprocessing.Array, both of which store ctypes objects. Both of these objects existed in 3.7 (appropriate documentation attached.)
If this approach does not work, consider examining these similar threads:
Sharing a complex object between processes?
multiprocessing: sharing a large read-only object between processes?
Good luck with your project!
Upvotes: 0
Reputation: 7040
Here is an approach that makes use a of a multiprocessing queue. It's similar to what you had implemented, but your implementation was trying to assign to self
within Buffer._refresh_data
in both processes. Because self
refers to a different Buffer
object in each process, they did not affect each other.
To send data from one process to another you need to use shared memory, pipes, or some other such mechanism. Python's multiprocessing library provides multiprocess.Queue
, which simplifies this for us.
To send data from the refresh helper to the main process we need only use queue.put
in the helper process, and queue.get
in the main process. The data being sent must be serializable using Python's pickle
module to be sent between the processes through a multiprocess.Queue
.
Using a multiprocess.Queue
also saves us from having to use locks ourselves, since the queue handles that internally.
To handle the helper process starting and stopping cleanly for the example, I have added __enter__
and __exit__
methods to make Buffer
into a context manager. They can be removed if you would rather manually stop the helper process.
I have also changed your _refresh_data
method into _get_new_data
, which returns new data half the time, and has no new data to give the other half of the time (i.e. it returns None
). This was done to make it more similar to what I imagine a real application for this class would be.
It is important that only static/class methods or external functions are called from the other process, as otherwise they may operate on a self
attribute that refers to a completely different instance. The exception is if the attribute is meant to be sent across the process barrier, like with self.queue
. That is why the update
method can use self.queue
to send data to the main process despite self
being a different Buffer
instance in the other process.
The method get_next_data
will return the oldest item found in the queue. If there is nothing in the queue, it will wait until something is added to the queue. You can change this behaviour by giving the call to self.queue.get
a timeout (which will cause an exception to be raised if it times out), or using self.queue.get_nowait
(which will return None
immediately if the queue is empty).
from __future__ import annotations
import multiprocessing as mp
import random
import time
class Buffer:
def __init__(self):
self.queue = mp.Queue()
self.proc = mp.Process(target=self._refresh_helper, args=(self,))
self.update()
def __enter__(self):
self.proc.start()
return self
def __exit__(self, ex_type, ex_val, ex_tb):
self.proc.kill()
self.proc.join()
@staticmethod
def _refresh_helper(buffer: "Buffer", period: float = 1.0) -> None:
"""Periodically calls refresh method in a buffer instance."""
while True:
buffer.update()
time.sleep(period)
@staticmethod
def _get_new_data() -> list[int] | None:
"""Pretends to read in some data. This would take a while for real data"""
if random.randint(0, 1):
return random.choices(range(10), k=3)
return None
def update(self) -> None:
new_data = self._get_new_data()
if new_data is not None:
self.queue.put(new_data)
def get_next_data(self):
return self.queue.get()
if __name__ == '__main__':
with Buffer() as buffer:
for _ in range(5):
print(buffer.get_next_data())
Running this code will, as an example, start the helper process, then print out the first 5 pieces of data it gets from the buffer. The first one will be from the update
that is performed when the buffer is initialized. The others will all be provided by the helper process running update
.
Let's review your criteria:
Manual update: An instance of the class should have an 'update' function, which reads in new data.
The Buffer.update
method can be used for this.
Automatic update: An instance's update method should be periodically run, so the buffered data never gets too old. As reading takes a while, I'd like to do this without blocking the main process.
This is done by a helper process which adds data to a queue for later processing. If you would rather throw away old data, and only process the newest data, then the queue can be swapped out for a multiprocess.Array
, or whatever other multiprocessing
compatible shared memory wrapper you prefer.
Self contained: Users should be able to inherit from the class and overwrite the method for refreshing data, i.e. the automatic updating should work out of the box.
This works by overwriting the _get_new_data
method. So long as it's a static or class method which returns the data, automatic updating should work with it without any changes.
Upvotes: 3