Reputation: 3122
I'm trying to use the multiprocessing
module to implement a simple network traffic forwader.
My application listens on a port, and when it receives an inbound connection it makes an outgoing TCP connection to another server and then shuttles the data back and forth between the two connections.
I've been trying to test my code using the nc
utility, but seems that my application goes inside the recv_bytes()
call and blocks even though there is data there.
Here's a simplified version of my forwarding code (tested fully runnable):
from multiprocessing import Process
from multiprocessing.connection import Listener, Client, wait
def start_serving(listen_port, outbound_port):
with Listener(('', listen_port)) as server:
print(f"Waiting for connections on port {listen_port}")
with server.accept() as inbound_conn:
print(f"Connection accepted from {server.last_accepted}")
outbound_conn = Client(('localhost', outbound_port))
print(f"Connected to port {outbound_port}")
readers = [inbound_conn, outbound_conn]
print(f"inbound_reader = {inbound_conn}")
print(f"outbound_reader = {outbound_conn}")
while readers:
for r in wait(readers):
try:
print(f"Calling recv_bytes with reader {r}")
data = r.recv_bytes() # This blocks even when there's data
print(f"Out of recv_bytes with reader {r}")
except EOFError:
readers.remove(r)
else:
fwd_to_conn = None
if r is inbound_conn:
fwd_to_conn = outbound_conn
print("read from inbound connection")
elif r is outbound_conn:
fwd_to_conn = outbound_conn
print("read from outbound connection")
if fwd_to_conn is not None:
print(f"Forwarding {len(bytes)} bytes")
fwd_to_conn.send_bytes(data)
forwarder = Process(target=start_serving, daemon=True, args=(19001, 19002))
forwarder.start()
forwarder.join()
I run this script and in a separate consoles I run:
$ echo "Hi from outbound" | nc -l -p 19002
and
$ echo "Hi from inbound" | nc localhost 19001
This is the output I get:
Waiting for connections on port 19001
Connection accepted from ('127.0.0.1', 56874)
Connected to port 19002
inbound_reader = <multiprocessing.connection.Connection object at 0x7ff3730e2850>
outbound_reader = <multiprocessing.connection.Connection object at 0x7ff3730e2a60>
Calling recv_bytes with reader <multiprocessing.connection.Connection object at 0x7ff3730e2850>
As you can see the application is blocked inside the recv_bytes()
call on the inbound connection even though there is data there.
Seems like a very simple application, so I'm hoping there's an obvious solution here.
EDIT
Digging into the multiprocessing.connection.Connection
code a bit it seems that when using recv_bytes()
the code expects the size of the message to be in the first 4 bytes. However, I don't want it to assume any kind of format and just read as much data as it can without trying to interpret it.
Thanks!
Upvotes: 2
Views: 806
Reputation: 11075
It seems like you don't actually want or need the functionality of mp.Pipe
or mp.connection.Connection
objects, so here it makes sense to skip that, and just use socket.socket
, and maybe socketserver
(which could be done with just socket
, but has a good base of functionality pre-written for you).
Here's an example (with a decent amount of copy-paste from the docs) of listening with a socketserver
, and in the connection handler, creating a new connection to another port to forward on the data.
import socketserver
import socket
class MyTCPServer(socketserver.TCPServer):
def __init__(self, *args, outbound_port=None, **kwargs):
self.outbound_port = outbound_port
super().__init__(*args, **kwargs)
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
#also forward the data to a second server (nc)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Connect to server and send data
sock.connect(("localhost", self.server.outbound_port))
sock.sendall(self.data.upper() + b"\n")
# Receive data from the server and shut down
received = sock.recv(1024)
print(received)
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
# Create the server, binding to localhost on port 9999
with MyTCPServer((HOST, PORT), MyTCPHandler, outbound_port=9998) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Using the same nc
commands (with different ports) should give the outputs you expect I believe.
If you do in fact want the functionality of sending more structured data than raw bytes, here's an example of a similar layout but with mp.connection
:
import multiprocessing as mp
from multiprocessing.connection import Listener, Client
import sys
def server_A():
listen_port, outbound_port = 9999, 9998
with Listener(('', listen_port)) as listener:
while True: #serve forever (until keyboardinterrupt) right in main...
try:
conn1 = listener.accept()
except mp.AuthenticationError:
print("auth error")
except KeyboardInterrupt:
break
obj = conn1.recv() #get from client
forward = ("object recieved: server A", obj)
with Client(('', outbound_port)) as conn2:
conn2.send(forward) #forward to B
response = conn2.recv() #recv from B
conn1.send(response) #send response from B back to Client
print(conn1.recv_bytes())
print("but sending bytes can have an overhead benefit for large blocks of data (arrays for example)")
conn1.close() #because we're not using contextmanager for conn1
def server_B():
listen_port = 9998
with Listener(('', listen_port)) as listener:
while True: #serve forever (until keyboardinterrupt) right in main...
try:
conn = listener.accept()
except mp.AuthenticationError:
print("auth error")
except KeyboardInterrupt:
break
obj = conn.recv()
response = ("object recieved: server B", obj)
conn.send(response)
conn.close() #because we're not using contextmanager
def client():
with Client(('', 9999)) as conn:
conn.send({"some":["data"]}) #connections are useful for sending more structured data than bytes.
print(conn.recv())
conn.send_bytes(b"binary data isn't sent totally raw because there's a \"protocol\" for communication")
if __name__ == "__main__":
if "A" in sys.argv[1]:
server_A()
elif "B" in sys.argv[1]:
server_B()
elif "C" in sys.argv[1]:
client()
*note: both examples tested on ubuntu 20.04 python 3.8.5
Upvotes: 2