J. Anderson
J. Anderson

Reputation: 309

Server process started by multiprocessing.Manager() makes piped socket not closed immediately

I have following code, server accept net connection, pass it to child to process with Manager().Queue():

q = Manager().Queue()

class Server:

    def run(self, host, port):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind((host, port))
        s.listen(1)

        print('parent', os.getpid())

        while True:
            c, _ = s.accept()
            q.put(c)
            c.close()

def handle_request():
    print('child', os.getpid())
    while True:
        c = q.get()
        time.sleep(1)
        print(c.recv(4))
        c.close()

Process(target=handle_request, args=()).start()
Server().run('127.0.0.1', 10000)   

close doesn't work as expected, I think it is because Manager's server process sill have a reference on that socket, lsof -i confirmed. How to deal with this? I found there is not a way to close the socket in Manager process, shutdown could do the trick but not what I want.

Upvotes: 2

Views: 655

Answers (1)

Hannu
Hannu

Reputation: 12205

Interesting problem.

I am not sure if this is of any help, but I found your code somewhat odd in the beginning, as sending socket objects using Manager().Queue() to another process does not sound like it is supported. It may be, but sending a file descriptor to another process needs a couple of hoops. I changed your code a bit to do it as I would do it - basically reducing and reconstructing handles.

from multiprocessing import Manager, Process
from multiprocessing.reduction import reduce_handle, rebuild_handle
import socket
import os
from time import sleep

q = Manager().Queue()


class Server:
    def run(self, host, port):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        s.listen(1)

        print('parent', os.getpid())

        while True:
            c, _ = s.accept()
            foo = reduce_handle(c.fileno())
            q.put(foo)
            c.close()


def handle_request():
    print('child', os.getpid())
    while True:
        bar = q.get()
        sleep(1)
        barbar = rebuild_handle(bar)
        c = socket.fromfd(barbar, socket.AF_INET, socket.SOCK_STREAM)
        print(c.recv(4))
        c.shutdown(socket.SHUT_RDWR)

Process(target=handle_request, args=()).start()
Server().run('127.0.0.1', 10000)

This does not leave any sockets behind in CLOSE_WAIT at least when I ran it, and it works as I would expect it to work.

Upvotes: 1

Related Questions