Reputation: 69346
I'm trying to emulate the behavior of the following simple socat(1)
command:
socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'
The above command creates a forking TCP server which forks and executes SOME_PROGRAM
for each connection, redirecting both stdin
and stdout
of said command to the TCP socket.
Here's what I'd like to achieve:
asyncio
to handle multiple concurrent connections.SOME_PROGRAM
as a sub-process.SOME_PROGRAM
's standard input.SOME_PROGRAM
's standard output to the socket.SOME_PROGRAM
exits, write a goodbye message along with the exit code to the socket and close the connection.I would like to do this in pure Python, without using external libraries using the asyncio
module.
Here's the code I wrote so far:
import asyncio
class ServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.client_addr = transport.get_extra_info('peername')
self.transport = transport
self.child_process = None
print('Connection with {} enstablished'.format(self.client_addr))
asyncio.ensure_future(self._create_subprocess())
def connection_lost(self, exception):
print('Connection with {} closed.'.format(self.client_addr))
if self.child_process.returncode is not None:
self.child_process.terminate()
def data_received(self, data):
print('Data received: {!r}'.format(data))
# Make sure the process has been spawned
# Does this even make sense? Looks so awkward to me...
while self.child_process is None:
continue
# Write any received data to child_process' stdin
self.child_process.stdin.write(data)
async def _create_subprocess(self):
self.child_process = await asyncio.create_subprocess_exec(
*TARGET_PROGRAM,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
# Start reading child stdout
asyncio.ensure_future(self._pipe_child_stdout())
# Ideally I would register some callback here so that when
# child_process exits I can write to the socket a goodbye
# message and close the connection, but I don't know how
# I could do that...
async def _pipe_child_stdout(self):
# This does not seem to work, this function returns b'', that is an
# empty buffer, AFTER the process exits...
data = await self.child_process.stdout.read(100) # Arbitrary buffer size
print('Child process data: {!r}'.format(data))
if data:
# Send to socket
self.transport.write(data)
# Reschedule to read more data
asyncio.ensure_future(self._pipe_child_stdout())
SERVER_PORT = 6666
TARGET_PROGRAM = ['./test']
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
server = loop.run_until_complete(coro)
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
And also the ./test
program I'm trying to run as subprocess:
#!/usr/bin/env python3
import sys
if sys.stdin.read(2) == 'a\n':
sys.stdout.write('Good!\n')
else:
sys.exit(1)
if sys.stdin.read(2) == 'b\n':
sys.stdout.write('Wonderful!\n')
else:
sys.exit(1)
sys.exit(0)
Unfortunately the above code doesn't really work, and I'm kind of lost about what to try next.
What works as intended:
htop
and I can also see that as soon as I send b\n
it terminates.What doesn't work as intended:
Basically anything else...
await self.child_process.stdout.read(100)
never seems to terminate: instead, it only terminates after the child process dies and the result is just b''
(an empty bytes
object).self.child_process.returncode
when this happens, but I don't know how to do this in a way which makes sense.What I tried:
asyncio.loop.subprocess_exec()
instead of asyncio.create_subprocess_exec()
. This solves the problem of knowing when the process terminates, since I can instantiate a subclass of asyncio.SubprocessProtocol
and use its process_exited()
method, but doesn't really help me at all, since if I do it this way I don't have a mean to talk to the process' stdin
or stdout
anymore! That is, I don't have a Process
object to interact with...asyncio.loop.connect_write_pipe()
and loop.connect_read_pipe()
with no luck.So, could someone please help me figure out what am I doing wrong? There has to be a way to make this work smoothly. When I first started, I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.
Upvotes: 7
Views: 6925
Reputation: 154916
Your code has two immediate implementation issues:
"a\n"
, the subprocess will receive just "a"
. That way the subprocess never encounters the expected "a\n"
string, and it always terminates after reading two bytes. This explains the empty string (EOF) coming from the subprocess.The other issue is on the design level. As mentioned in the comments, unless your explicit intention is to implement a new asyncio protocol, it is recommended to stick to the higher-level stream-based API, in this case the start_server
function. The even lower-level functionality like SubprocessProtocol
, connect_write_pipe
, and connect_read_pipe
is also not something you would want to use in application code. The rest of this answer assumes a stream-based implementation.
start_server
accepts a coroutine which will be spawned as a new task whenever a client connects. It is called with two asyncio stream arguments, one for reading and one for writing. The coroutine contains the logic of communicating to a client; in your case it would spawn the subprocess and transfer data between it and the client.
Note that the bidirectional data transfer between the socket and the subprocess cannot be achieved with a simple loop with reads followed by writes. For example, consider this loop:
# INCORRECT: can deadlock (and also doesn't detect EOF)
child = await asyncio.create_subprocess_exec(...)
while True:
proc_data = await child.stdout.read(1024) # (1)
sock_writer.write(proc_data)
sock_data = await sock_reader.read(1024)
child.stdin.write(sock_data) # (2)
This kind of loop is prone to deadlocks. If the subprocess is responding to data it receives from the TCP client, it will sometimes only provide output after it receives some input. That will block the loop at (1) indefinitely, because it can get the data from child's stdout
only after sending the child the sock_data
, which happens later, at (2). Effectively, (1) waits for (2) and vice versa, constituting a deadlock. Note that reversing the order of transfers won't help because then the loop would deadlock if the TCP client is processing the output of the server's subprocess.
With asyncio at our disposal, such deadlock is easy to avoid: simply spawn two coroutines in parallel, one that transfers data from the socket to the subprocess's stdin, and another that transfers data from the subrocess's stdout to the socket. For example:
# correct: deadlock-free (and detects EOF)
async def _transfer(src, dest):
while True:
data = await src.read(1024)
if data == b'':
break
dest.write(data)
child = await asyncio.create_subprocess_exec(...)
loop.create_task(_transfer(child.stdout, sock_writer))
loop.create_task(_transfer(sock_reader, child.stdin))
await child.wait()
The difference between this setup and the first while
loop is that the two transfers independent of each other. The deadlock cannot occur because the read from the socket never waits for the read from the subprocess and vice versa.
Applied to the question, the whole server would look like this:
import asyncio
class ProcServer:
async def _transfer(self, src, dest):
while True:
data = await src.read(1024)
if data == b'':
break
dest.write(data)
async def _handle_client(self, r, w):
loop = asyncio.get_event_loop()
print(f'Connection from {w.get_extra_info("peername")}')
child = await asyncio.create_subprocess_exec(
*TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE)
sock_to_child = loop.create_task(self._transfer(r, child.stdin))
child_to_sock = loop.create_task(self._transfer(child.stdout, w))
await child.wait()
sock_to_child.cancel()
child_to_sock.cancel()
w.write(b'Process exited with status %d\n' % child.returncode)
w.close()
async def start_serving(self):
await asyncio.start_server(self._handle_client,
'0.0.0.0', SERVER_PORT)
SERVER_PORT = 6666
TARGET_PROGRAM = ['./test']
if __name__ == '__main__':
loop = asyncio.get_event_loop()
server = ProcServer()
loop.run_until_complete(server.start_serving())
loop.run_forever()
The accompanying test
program must also be modified to call sys.stdout.flush()
after every sys.stdout.write()
, otherwise the messages linger in its stdio buffers instead of being sent to the parent.
When I first started I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.
On Unix-like systems it is certainly possible to redirect a socket to a spawned subprocess, so that the subprocess is directly talking to the client. (The old inetd
Unix server works like this.) But that mode of operation is not supported by asyncio, for two reasons:
Even if you don't care about portability, consider the second point: you might need to process or log the data exchanged between the TCP client and the subprocess, and you can't do that if they're welded together on the kernel level. Also, timeouts and cancelation are much easier to implement in asyncio coroutines than when dealing with just an opaque subprocess.
If non-portability and inability to control the communication is fine for your use case, then you probably don't need asyncio in the first place - nothing prevents you from spawning a thread that runs a classic blocking server which handles each client with the same sequence of os.fork
, os.dup2
, and os.execlp
that you'd write in C.
EDIT
As the OP points out in a comment, the original code handles the TCP client disconnecting by killing the child process. At the stream layer, a connection loss is reflected by the stream either signaling end-of-file or raising an exception. In the above code, one could easily react to connection loss by replacing the generic self._transfer()
with a more specific coroutine that handles that case. For example, instead of:
sock_to_child = loop.create_task(self._transfer(r, child.stdin))
...one could write:
sock_to_child = loop.create_task(self._sock_to_child(r, child))
and define _sock_to_child
like this (untested):
async def _sock_to_child(self, reader, child):
try:
await self._transfer(reader, child.stdin)
except IOError as e:
# IO errors are an expected part of the workflow,
# we don't want to propagate them
print('exception:', e)
child.kill()
If the child outlives the TCP client, the child.kill()
line will likely never execute because the coroutine will be canceled by _handle_client
while suspended in src.read()
inside _transfer()
.
Upvotes: 8