sstevan
sstevan

Reputation: 487

Python3 Windows multiprocessing passing socket to process

I'm trying to make multiprocessing ServerApp to work on Windows. I guess the issue is missing os.fork() feature so I'll have to pass socket somehow which is not pickleable (?!).

I've seen that this might be possible using reduce_handle and rebuild_handle from multiprocessing.reduction as shown here but those methods are not available in Python 3 (?!). Although I have available duplicate and steal_handle available I can't find an example how to use them or whether I need them at all.

Also, I'd like to know if logging is going to be the problem when creating a new process?

Here's my ServerApp sample:

import logging
import socket

from select import select
from threading import Thread
from multiprocessing import Queue
from multiprocessing import Process
from sys import stdout
from time import sleep


class ServerApp(object):

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(stdout)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


    def conn_handler(self, connection, address, buffer):

        self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1])

        try:
            while True:

                command = None
                received_data = b''
                readable, writable, exceptional = select([connection], [], [], 0)  # Check for client commands

                if readable:
                    # Get Command  ... There is more code here
                    command = 'Something'


                if command == 'Something':
                    connection.sendall(command_response)
                else:
                    print(':(')

        except Exception as e:
            print(e)
        finally:
            connection.close()
            self.client_buffers.remove(buffer)
            self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1])


    def join(self):

        while self.listener.is_alive():
            self.listener.join(0.5)


    def acceptor(self):

        while True:
            self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port)

            # Accept a connection on the bound socket and fork a child process to handle it.
            conn, address = self.socket.accept()

            # Create Queue which will represent buffer for specific client and add it o list of all client buffers
            buffer = Queue()
            self.client_buffers.append(buffer)

            process = Process(target=self.conn_handler, args=(conn, address, buffer))
            process.daemon = True
            process.start()
            self.clients.append(process)

            # Close the connection fd in the parent, since the child process has its own reference.
            conn.close()


    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048):

        self.id = id
        self.port = port
        self.ip = ip

        self.socket = None
        self.listener = None
        self.buffer_size = buffer_size

        # Additional attributes here....

        self.clients = []
        self.client_buffers = []


    def run(self):

        # Create TCP socket, bind port and listen for incoming connections
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((self.ip, self.port))
        self.socket.listen(5)

        self.listener = Thread(target=self.acceptor)  # Run acceptor thread to handle new connection
        self.listener.daemon = True
        self.listener.start()

Upvotes: 2

Views: 2778

Answers (1)

Thomas Moreau
Thomas Moreau

Reputation: 4467

To allow connection pickling (including sockets) for python3, you should use the mulitprocessing.allow_connection_pickling. It registers reducers for sockets in ForkingPickler. For instance:

import socket
import multiprocessing as mp
mp.allow_connection_pickling()


def _test_connection(conn):
    msg = conn.recv(2)
    conn.send(msg)
    conn.close()
    print("ok")

if __name__ == '__main__':
    server, client = socket.socketpair()

    p = mp.Process(target=_test_connection, args=(server,))
    p.start()

    client.settimeout(5)

    msg = b'42'
    client.send(msg)
    assert client.recv(2) == msg

    p.join()
    assert p.exitcode == 0

    client.close()
    server.close()

I also noticed that you have some other issues unrealted to the pickling of socket.

  • When use self.conn_handler as a target, the multiprocessing will try to pickle the entire object self. This is an issue as your object contains some Thread that cannot be pickled. You should thus remove self from the closure of your target function. It can be done by using the @staticmethod decorator and by removing all mention of self in the function.

  • Also, the logging module is not done to handle multiple processes. Basically, all the logs from the launched Process will be lost with your current code. To fix that, you can either start a new logging once you start the second Process (at the beginning of conn_handler) or use the multiprocessing logging utility.

This can gives something like this:

import logging
import socket

from select import select
from threading import Thread
from multiprocessing import util, get_context
from sys import stdout
from time import sleep

util.log_to_stderr(20)
ctx = get_context("spawn")


class ServerApp(object):

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(stdout)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp',
                buffer_size=2048):

        self.id = id
        self.port = port
        self.ip = ip

        self.socket = None
        self.listener = None
        self.buffer_size = buffer_size

        # Additional attributes here....

        self.clients = []
        self.client_buffers = []

    @staticmethod
    def conn_handler(id, connection, address, buffer):

        print("test")
        util.info("[%d] - Connection from %s:%d", id, address[0], address[1])

        try:
            while True:

                command = None
                received_data = b''
                # Check for client commands
                readable, writable, exceptional = select([connection], [], [],
                                                        0)

                if readable:
                    # Get Command  ... There is more code here
                    command = 'Something'

                if command == 'Something':
                    connection.sendall(b"Coucouc")
                    break
                else:
                    print(':(')
                sleep(.1)

        except Exception as e:
            print(e)
        finally:
            connection.close()
            util.info("[%d] - Connection from %s:%d has been closed.", id,
                    address[0], address[1])
            print("Close")

    def join(self):

        while self.listener.is_alive():
            self.listener.join(0.5)

    def acceptor(self):

        while True:
            self.logger.info("[%d] - Waiting for connection on %s:%d", self.id,
                            self.ip, self.port)

            # Accept a connection on the bound socket and fork a child process
            # to handle it.
            conn, address = self.socket.accept()

            # Create Queue which will represent buffer for specific client and
            # add it o list of all client buffers
            buffer = ctx.Queue()
            self.client_buffers.append(buffer)

            process = ctx.Process(target=self.conn_handler,
                                args=(self.id, conn, address, buffer))
            process.daemon = True
            process.start()
            self.clients.append(process)

            # Close the connection fd in the parent, since the child process
            # has its own reference.
            conn.close()

    def run(self):

        # Create TCP socket, bind port and listen for incoming connections
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((self.ip, self.port))
        self.socket.listen(5)

        # Run acceptor thread to handle new connection
        self.listener = Thread(target=self.acceptor)
        self.listener.daemon = True
        self.listener.start()

        self.listener.join()


def main():
    app = ServerApp(0)
    app.run()


if __name__ == '__main__':
    main()

I only tested it on Unix and python3.6 but it should not have behavior too different as I use the spawn context, which should behave like theProcess` in windows.

Upvotes: 4

Related Questions