Reputation: 53
I have a situation where the subprocess communicate hangs when i have to run the subprocess inside an asyncio event loop, and the whole thing is inside a separate thread.
I learned that in order to run subprocess in separate thread, i need to have
1. an event loop running in main thread, and
2. a child watcher must be initiated in main thread.
After having the above conditions i got my subprocess work. But the subprocess.communicate is hanging now. The same code is working if call it from the main thread.
After digging further i observed that communicate is hanging because the process is not finishing on its own. ie await process.wait()
is actually hanging.
I have seen communicate hang when the command i am trying to issue in subprocess itself hangs, but that is not the case here.
import asyncio
import shlex
import threading
import subprocess
async def sendcmd(cmd):
cmdseq = tuple(shlex.split(cmd))
print(cmd)
p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p.pid)
output = (await asyncio.wait_for(p.communicate(), 5))[0]
output = output.decode('utf8')
print(output)
return output
async def myfunc(cmd):
o = await sendcmd(cmd)
return o
def myfunc2():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = []
tasks.append(asyncio.ensure_future(myfunc('uname -a')))
loop.run_until_complete(asyncio.gather(*tasks))
async def myfunc3():
t = threading.Thread(target=myfunc2)
t.start()
t.join()
def main():
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(myfunc3()))
loop.close()
main()
Upvotes: 5
Views: 2214
Reputation: 324
I think this fixes it. Use loop run_in_executor for the Threads.
import asyncio
import shlex
import threading
import subprocess
import logging
async def sendcmd(cmd):
cmdseq = tuple(shlex.split(cmd))
print(cmd)
p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p.pid)
output = (await asyncio.wait_for(p.communicate(), 5))[0]
output = output.decode('utf8')
print(output)
return output
async def myfunc(cmd):
o = await sendcmd(cmd)
return o
def myfunc2():
thread_loop = asyncio.new_event_loop()
asyncio.set_event_loop(thread_loop)
thread_loop.set_debug(True)
tasks = []
tasks.append(asyncio.ensure_future(myfunc('uname -a')))
thread_loop.run_until_complete(asyncio.gather(*tasks))
thread_loop.close()
async def myfunc3(loop=None):
await loop.run_in_executor(None, myfunc2)
def main():
logfilename='test.log'
print('Writing log to {}'.format(logfilename))
logging.basicConfig(filename=logfilename, level=logging.INFO, format='%(asctime)s %(name)s %(module)s %(levelname)-8s %(message)s')
logging.getLogger('asyncio').setLevel(logging.DEBUG)
root = logging.getLogger(__name__)
cw=asyncio.get_child_watcher()
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(asyncio.ensure_future(myfunc3(loop=main_loop)))
cw.close()
main_loop.close()
main()
Upvotes: 1
Reputation: 324
It looks like the subprocess SIGCHLD isn't being received by the worker thread but by the parent. These means process.wait() won't be signaled by the operating system. There is another discussion about this here.
It looks like the child watcher is supposed to detect the SIGCHLD and propagate it to other threads (or pids) and their event loops, which also seems to be it's primary design purpose. (Documentation is lacking so reading the source is required.)
Note: I think the t.join() is blocking the main thread which runs the child watcher, so that needs to be fixed. I just put a while loop there and end the main event loop when t.is_alive() returns False.
I notice the signal_noop is firing so that's good. The issue seems related to signal.set_wakeup_fd(self._csock.fileno()) which seems to being set properly. I need to debug a bit more to find out how that event is processed and why the main event loop isn't getting that signal. I'm at the point of noticing that _process_self_data(self, data) in unix_events.py isn't occurring.
Python signal handlers are always executed in the main Python thread, even if the signal was received in another thread. This means that signals can’t be used as a means of inter-thread communication. You can use the synchronization primitives from the threading module instead.
Besides, only the main thread is allowed to set a new signal handler.
Upvotes: 4