Leon
Leon

Reputation: 6554

Share list between process in python server

I have simple UDPServer, which works with multiprocessing.

I want to create a list, that contains information about all clients.

I use Manager, but I don't understand, how to append information in list - I need transfer Manager`s object to handle, but how? My way with new attribute does not work.

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()

How to fix that? Thanks!

Output:

Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
    self.handle()
  File "server.py", line 15, in handle
    ChatHandler.clients.append(self.client_address)
  File "<string>", line 2, in append
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
    self._connect()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
    c = SocketClient(address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

Upvotes: 15

Views: 12652

Answers (5)

sezanzeb
sezanzeb

Reputation: 1139

If you can't use the manager for whatever reason, you can also implement one on your own that fits your needs.

My unittest was configured to stop all child processes that are left over if they are not properly shut down as expected, which destroyed the manager. So I needed something that can be arbitrarily started and stopped without bothering the tests.

import multiprocessing
import atexit
import select

class SharedDict:
    """Share a dictionary across processes."""
    def __init__(self):
        """Create a shared dictionary."""
        super().__init__()
        self.pipe = multiprocessing.Pipe()
        self.process = None
        atexit.register(self._stop)
        self._start()

    def _start(self):
        """Ensure the process to manage the dictionary is running."""
        if self.process is not None and self.process.is_alive():
            return

        # if the manager has already been running in the past but stopped
        # for some reason, the dictionary contents are lost
        self.process = multiprocessing.Process(target=self.manage)
        self.process.start()

    def manage(self):
        """Manage the dictionary, handle read and write requests."""
        shared_dict = dict()
        while True:
            message = self.pipe[0].recv()
            logger.spam('SharedDict got %s', message)

            if message[0] == 'stop':
                return

            if message[0] == 'set':
                shared_dict[message[1]] = message[2]

            if message[0] == 'get':
                self.pipe[0].send(shared_dict.get(message[1]))

    def _stop(self):
        """Stop the managing process."""
        self.pipe[1].send(('stop',))

    def get(self, key):
        """Get a value from the dictionary."""
        return self.__getitem__(key)

    def __setitem__(self, key, value):
        self.pipe[1].send(('set', key, value))

    def __getitem__(self, key):
        self.pipe[1].send(('get', key))

        # to avoid blocking forever if something goes wrong
        select.select([self.pipe[1]], [], [], 0.1)
        if self.pipe[1].poll():
            return self.pipe[1].recv()

        return None

    def __del__(self):
        self._stop()


shared_dict = SharedDict()

You can extend this with all sorts of methods, and you can stop and restart it whenever you like (though the dict will be lost each time). The pipes will remain the same all the time, so all child processes can also talk to the restarted manager without the need for new pipe fds.

I might extend this stuff with more functionality. If I didn't move that class into its own module in the meantime, it can be found at https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py

Upvotes: 0

You can use Python native library multiprocessing.SharedMemory

Or this:

import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()

def worker1(l):
    l.append(1)

def worker2(l):
    l.append(2)

process1 = multiprocessing.Process(
    target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
    target=worker2, args=[shared_list])

process1.start()
process2.start()
process1.join()
process2.join()

print shared_list

Upvotes: 0

Devopsception
Devopsception

Reputation: 557

if you're using it anyway like the following way you might require to look at the length of list you're passing or hardcoded count of workers it might be exceeding your machine's capability:

        pool = Pool(len(somelist))
        # call the function 'somefunction' in parallel for each somelist.
        pool.map(somefunction, somelist)

i reduced the workers it resolved the issue for me.

Upvotes: 0

dano
dano

Reputation: 94961

The problem is that you're letting the main process finish its execution immediately after you start the worker process. When the process that created the multiprocessing.Manager finishes its execution, the Manager server gets shut down, which means your shared list object is now useless. This happens because the Manager object registers it's shutdown function as a "finalizer" with the multiprocessing module, which means it will be run just before the process exits. Here's the code that registers it, in BaseManager.__init__:

    # register a finalizer
    self._state.value = State.STARTED
    self.shutdown = util.Finalize(
        self, type(self)._finalize_manager,
        args=(self._process, self._address, self._authkey,
              self._state, self._Client),
        exitpriority=0
        )

Here's the code that actually does the shut down:

@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
    '''
    Shutdown the manager process; will be registered as a finalizer
    '''
    if process.is_alive():
        util.info('sending shutdown message to manager')
        try:
            conn = _Client(address, authkey=authkey)
            try:
                dispatch(conn, None, 'shutdown')
            finally:
                conn.close()
        except Exception:
            pass

        process.join(timeout=1.0)
        if process.is_alive():
            util.info('manager still alive')
            if hasattr(process, 'terminate'):
                util.info('trying to `terminate()` manager process')
                process.terminate()
                process.join(timeout=0.1)
                if process.is_alive():
                    util.info('manager still alive after terminate')

    state.value = State.SHUTDOWN
    try:
        del BaseProxy._address_to_local[address]
    except KeyError:
        pass

The fix is simple - don't let the main process complete immediately you start the process that runs the UDP server, by calling server_process.join():

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()
    server_process.join() # This fixes the issue.

Upvotes: 21

johntellsall
johntellsall

Reputation: 15180

The following shows an example of a UDP server and a shared list.

  • parent code creates a Manager, a managed list, and passed it to start_server()

  • this function in turn actually starts the server, storing the shared list such that the server -- and its handler -- can access it

  • when a packet arrives, the handle() method is triggered. This accesses the server using self.server, and the shared list with self.server.client_list, an attribute on the ChatServer instance.

I did testing by starting the server, waiting a second, then sending a UDP packet "beer" using the netcat command. For some reason it sends Xs first, and each output is duplicated. This is a bug, but the code should point you in the right direction.

source

import multiprocessing as mp, signal, sys
from SocketServer import (
    UDPServer, ForkingMixIn, DatagramRequestHandler
)

class ChatHandler(DatagramRequestHandler):
    def handle(self):
        data,_socket = self.request
        curproc = mp.current_process()
        print '{}: {}'.format(
            curproc,
            dict(
                data_len=len(data), 
                data=data.strip(),
                client=self.client_address,
            ))
        self.server.client_list.append(
            self.client_address)
        print('{}: {}'.format(
            curproc,
            dict(client_list=self.server.client_list),
        ))

class ChatServer(ForkingMixIn, UDPServer):
    client_list = None

def start_server(client_list):
    server = ChatServer(('', 9876), ChatHandler)
    server.client_list = client_list
    server.serve_forever()

if __name__ == '__main__':
    clist = mp.Manager().list()
    mp.Process(
        target=start_server, args=[clist],
        name='udpserver',
    ).start()

    signal.alarm(5)             # die in 5 seconds
    signal.pause()              # wait for control-C or alarm

test run

(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py

<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}

Upvotes: 0

Related Questions