Eugene S
Eugene S

Reputation: 3122

Python multiprocessing connection recv_bytes not returning data

I'm trying to use the multiprocessing module to implement a simple network traffic forwader.

My application listens on a port, and when it receives an inbound connection it makes an outgoing TCP connection to another server and then shuttles the data back and forth between the two connections.

I've been trying to test my code using the nc utility, but seems that my application goes inside the recv_bytes() call and blocks even though there is data there.

Here's a simplified version of my forwarding code (tested fully runnable):

from multiprocessing import Process
from multiprocessing.connection import Listener, Client, wait

def start_serving(listen_port, outbound_port):
    with Listener(('', listen_port)) as server:
        print(f"Waiting for connections on port {listen_port}")

        with server.accept() as inbound_conn:
            print(f"Connection accepted from {server.last_accepted}")

            outbound_conn = Client(('localhost', outbound_port))
            print(f"Connected to port {outbound_port}")

            readers = [inbound_conn, outbound_conn]

            print(f"inbound_reader = {inbound_conn}")
            print(f"outbound_reader = {outbound_conn}")

            while readers:
                for r in wait(readers):
                    try:
                        print(f"Calling recv_bytes with reader {r}")
                        data = r.recv_bytes() # This blocks even when there's data
                        print(f"Out of recv_bytes with reader {r}")
                    except EOFError:
                        readers.remove(r)
                    else:
                        fwd_to_conn = None
                        if r is inbound_conn:
                            fwd_to_conn = outbound_conn
                            print("read from inbound connection")
                        elif r is outbound_conn:
                            fwd_to_conn = outbound_conn
                            print("read from outbound connection")

                        if fwd_to_conn is not None:
                            print(f"Forwarding {len(bytes)} bytes")
                            fwd_to_conn.send_bytes(data)

forwarder = Process(target=start_serving, daemon=True, args=(19001, 19002))
forwarder.start()
forwarder.join()

I run this script and in a separate consoles I run:

$ echo "Hi from outbound" | nc -l -p 19002

and

$ echo "Hi from inbound" | nc localhost 19001

This is the output I get:

Waiting for connections on port 19001
Connection accepted from ('127.0.0.1', 56874)
Connected to port 19002
inbound_reader = <multiprocessing.connection.Connection object at 0x7ff3730e2850>
outbound_reader = <multiprocessing.connection.Connection object at 0x7ff3730e2a60>
Calling recv_bytes with reader <multiprocessing.connection.Connection object at 0x7ff3730e2850>

As you can see the application is blocked inside the recv_bytes() call on the inbound connection even though there is data there.

Seems like a very simple application, so I'm hoping there's an obvious solution here.

EDIT

Digging into the multiprocessing.connection.Connection code a bit it seems that when using recv_bytes() the code expects the size of the message to be in the first 4 bytes. However, I don't want it to assume any kind of format and just read as much data as it can without trying to interpret it.

Thanks!

Upvotes: 2

Views: 806

Answers (1)

Aaron
Aaron

Reputation: 11075

It seems like you don't actually want or need the functionality of mp.Pipe or mp.connection.Connection objects, so here it makes sense to skip that, and just use socket.socket, and maybe socketserver (which could be done with just socket, but has a good base of functionality pre-written for you).

Here's an example (with a decent amount of copy-paste from the docs) of listening with a socketserver, and in the connection handler, creating a new connection to another port to forward on the data.

import socketserver
import socket

class MyTCPServer(socketserver.TCPServer):
    def __init__(self, *args, outbound_port=None, **kwargs):
        self.outbound_port = outbound_port
        super().__init__(*args, **kwargs)

class MyTCPHandler(socketserver.BaseRequestHandler):

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        # just send back the same data, but upper-cased
        self.request.sendall(self.data.upper())
        
        #also forward the data to a second server (nc)
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            # Connect to server and send data
            sock.connect(("localhost", self.server.outbound_port))
            sock.sendall(self.data.upper() + b"\n")
        
            # Receive data from the server and shut down
            received = sock.recv(1024)
            print(received)

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    # Create the server, binding to localhost on port 9999
    with MyTCPServer((HOST, PORT), MyTCPHandler, outbound_port=9998) as server:
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl-C
        server.serve_forever()

Using the same nc commands (with different ports) should give the outputs you expect I believe.

If you do in fact want the functionality of sending more structured data than raw bytes, here's an example of a similar layout but with mp.connection:

import multiprocessing as mp
from multiprocessing.connection import Listener, Client
import sys
        
def server_A():
    listen_port, outbound_port = 9999, 9998
    with Listener(('', listen_port)) as listener:
        while True:  #serve forever (until keyboardinterrupt) right in main...
            try:
                conn1 = listener.accept()
            except mp.AuthenticationError:
                print("auth error")
            except KeyboardInterrupt:
                break
            obj = conn1.recv() #get from client
            forward = ("object recieved: server A", obj)
            with Client(('', outbound_port)) as conn2:
                conn2.send(forward) #forward to B
                response = conn2.recv() #recv from B
            
            conn1.send(response) #send response from B back to Client
            print(conn1.recv_bytes())
            print("but sending bytes can have an overhead benefit for large blocks of data (arrays for example)")
            conn1.close() #because we're not using contextmanager for conn1
            
def server_B():
    listen_port  = 9998
    with Listener(('', listen_port)) as listener:
        while True:  #serve forever (until keyboardinterrupt) right in main...
            try:
                conn = listener.accept()
            except mp.AuthenticationError:
                print("auth error")
            except KeyboardInterrupt:
                break
            
            obj = conn.recv()
            response = ("object recieved: server B", obj)
            conn.send(response)
            
            conn.close() #because we're not using contextmanager

def client():
    with Client(('', 9999)) as conn:
        conn.send({"some":["data"]}) #connections are useful for sending more structured data than bytes. 
        print(conn.recv())
        conn.send_bytes(b"binary data isn't sent totally raw because there's a \"protocol\" for communication")

if __name__ == "__main__":
    if "A" in sys.argv[1]:
        server_A()
    elif "B" in sys.argv[1]:
        server_B()
    elif "C" in sys.argv[1]:
        client()

*note: both examples tested on ubuntu 20.04 python 3.8.5

Upvotes: 2

Related Questions