Reputation: 487
I'm trying to make multiprocessing ServerApp
to work on Windows. I guess the issue is missing os.fork()
feature so I'll have to pass socket
somehow which is not pickleable (?!).
I've seen that this might be possible using reduce_handle
and rebuild_handle
from multiprocessing.reduction
as shown here but those methods are not available in Python 3 (?!). Although I have available duplicate
and steal_handle
available I can't find an example how to use them or whether I need them at all.
Also, I'd like to know if logging
is going to be the problem when creating a new process?
Here's my ServerApp sample:
import logging
import socket
from select import select
from threading import Thread
from multiprocessing import Queue
from multiprocessing import Process
from sys import stdout
from time import sleep
class ServerApp(object):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stdout)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def conn_handler(self, connection, address, buffer):
self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1])
try:
while True:
command = None
received_data = b''
readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands
if readable:
# Get Command ... There is more code here
command = 'Something'
if command == 'Something':
connection.sendall(command_response)
else:
print(':(')
except Exception as e:
print(e)
finally:
connection.close()
self.client_buffers.remove(buffer)
self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1])
def join(self):
while self.listener.is_alive():
self.listener.join(0.5)
def acceptor(self):
while True:
self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port)
# Accept a connection on the bound socket and fork a child process to handle it.
conn, address = self.socket.accept()
# Create Queue which will represent buffer for specific client and add it o list of all client buffers
buffer = Queue()
self.client_buffers.append(buffer)
process = Process(target=self.conn_handler, args=(conn, address, buffer))
process.daemon = True
process.start()
self.clients.append(process)
# Close the connection fd in the parent, since the child process has its own reference.
conn.close()
def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048):
self.id = id
self.port = port
self.ip = ip
self.socket = None
self.listener = None
self.buffer_size = buffer_size
# Additional attributes here....
self.clients = []
self.client_buffers = []
def run(self):
# Create TCP socket, bind port and listen for incoming connections
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip, self.port))
self.socket.listen(5)
self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection
self.listener.daemon = True
self.listener.start()
Upvotes: 2
Views: 2778
Reputation: 4467
To allow connection pickling (including sockets) for python3, you should use the mulitprocessing.allow_connection_pickling
. It registers reducers for sockets in ForkingPickler
. For instance:
import socket
import multiprocessing as mp
mp.allow_connection_pickling()
def _test_connection(conn):
msg = conn.recv(2)
conn.send(msg)
conn.close()
print("ok")
if __name__ == '__main__':
server, client = socket.socketpair()
p = mp.Process(target=_test_connection, args=(server,))
p.start()
client.settimeout(5)
msg = b'42'
client.send(msg)
assert client.recv(2) == msg
p.join()
assert p.exitcode == 0
client.close()
server.close()
I also noticed that you have some other issues unrealted to the pickling of socket
.
When use self.conn_handler
as a target, the multiprocessing will try to pickle the entire object self
. This is an issue as your object contains some Thread
that cannot be pickled. You should thus remove self
from the closure of your target function. It can be done by using the @staticmethod
decorator and by removing all mention of self
in the function.
Also, the logging
module is not done to handle multiple processes. Basically, all the logs from the launched Process
will be lost with your current code. To fix that, you can either start a new logging
once you start the second Process
(at the beginning of conn_handler
) or use the multiprocessing
logging utility.
This can gives something like this:
import logging
import socket
from select import select
from threading import Thread
from multiprocessing import util, get_context
from sys import stdout
from time import sleep
util.log_to_stderr(20)
ctx = get_context("spawn")
class ServerApp(object):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stdout)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp',
buffer_size=2048):
self.id = id
self.port = port
self.ip = ip
self.socket = None
self.listener = None
self.buffer_size = buffer_size
# Additional attributes here....
self.clients = []
self.client_buffers = []
@staticmethod
def conn_handler(id, connection, address, buffer):
print("test")
util.info("[%d] - Connection from %s:%d", id, address[0], address[1])
try:
while True:
command = None
received_data = b''
# Check for client commands
readable, writable, exceptional = select([connection], [], [],
0)
if readable:
# Get Command ... There is more code here
command = 'Something'
if command == 'Something':
connection.sendall(b"Coucouc")
break
else:
print(':(')
sleep(.1)
except Exception as e:
print(e)
finally:
connection.close()
util.info("[%d] - Connection from %s:%d has been closed.", id,
address[0], address[1])
print("Close")
def join(self):
while self.listener.is_alive():
self.listener.join(0.5)
def acceptor(self):
while True:
self.logger.info("[%d] - Waiting for connection on %s:%d", self.id,
self.ip, self.port)
# Accept a connection on the bound socket and fork a child process
# to handle it.
conn, address = self.socket.accept()
# Create Queue which will represent buffer for specific client and
# add it o list of all client buffers
buffer = ctx.Queue()
self.client_buffers.append(buffer)
process = ctx.Process(target=self.conn_handler,
args=(self.id, conn, address, buffer))
process.daemon = True
process.start()
self.clients.append(process)
# Close the connection fd in the parent, since the child process
# has its own reference.
conn.close()
def run(self):
# Create TCP socket, bind port and listen for incoming connections
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip, self.port))
self.socket.listen(5)
# Run acceptor thread to handle new connection
self.listener = Thread(target=self.acceptor)
self.listener.daemon = True
self.listener.start()
self.listener.join()
def main():
app = ServerApp(0)
app.run()
if __name__ == '__main__':
main()
I only tested it on Unix and python3.6 but it should not have behavior too different as I use the spawn context, which should behave like the
Process` in windows.
Upvotes: 4