Reputation: 58953
the following code (in the main thread) works well, I grep some files and the search until the first 100 results are found (writing the results to a file), then exit:
command = 'grep -F "%s" %s*.txt' % (search_string, DATA_PATH)
p = Popen(['/bin/bash', '-c', command], stdout = PIPE)
f = open(output_file, 'w+')
num_lines = MAX_RESULTS
while True:
line = p.stdout.readline()
print num_lines
if line != '':
f.write(line)
num_lines = num_lines - 1
if num_lines == 0:
break
else:
break
The very same code used into a Process subclass, always returns grep: writing output: Broken pipe
in the console:
class Search(Process):
def __init__(self, search_id, search_string):
self.search_id = search_id
self.search_string = search_string
self.grepped = ''
Process.__init__(self)
def run(self):
output_file = TMP_PATH + self.search_id
# flag if no regex chars
flag = '-F' if re.match(r"^[a-zA-Z0\ ]*$", self.search_string) else '-P'
command = 'grep %s "%s" %s*.txt' % (flag, self.search_string, DATA_PATH)
p = Popen(['/bin/bash', '-c', command], stdout = PIPE)
f = open(output_file, 'w+')
num_lines = MAX_RESULTS
while True:
line = p.stdout.readline()
print num_lines
if line != '':
f.write(line)
num_lines = num_lines - 1
if num_lines == 0:
break
else:
break
How come? How to fix this?
Upvotes: 2
Views: 227
Reputation: 880877
I can reproduce the error message like this:
import multiprocessing as mp
import subprocess
import shlex
def worker():
proc = subprocess.Popen(shlex.split('''
/bin/bash -c "grep -P 'foo' /tmp/test.txt"
'''), stdout = subprocess.PIPE)
line = proc.stdout.readline()
print(line)
# proc.terminate() # This fixes the problem
if __name__=='__main__':
N = 6000
with open('/tmp/test.txt', 'w') as f:
f.write('bar foo\n'*N) # <--- Increasing this number causes grep: writing output: Broken pipe
p = mp.Process(target = worker)
p.start()
p.join()
If the above code does not produce the error for you, increase the size of the file /tmp/test.txt by increasing N
. (Conversely, you can hide the fact that there is a bug in the code by reducing N
.)
If the worker process ends before the grep subprocess, then the grep gets a SIGPIPE telling it that its stdout has been closed. grep
responds by printing
grep: writing output: Broken pipe
to stderr, once for every line it is still processing.
The fix is to terminate the process with proc.terminate()
before worker
ends.
Upvotes: 1