Troper
Troper

Reputation: 178

How to resolve BrokenPipeError when using multiprocessing in Python

I am working on learning multiproccessing, and have had no issues until I encountered this one when working with queues. Essentially, the queue gets filled up, but then something seems to go wrong and it crashes.

I am running python 3.6.8 on Windows 10. multiprocessing has seemed to work when I was not using queues (I built a similar code snippet to the below without queues to learn).

import glob, multiprocessing, os

def appendFilesThreaded(inputDirectory, outputDirectory, inputFileType=".txt", outputFileName="appended_files.txt"):
    files = glob.glob(inputDirectory+'*'+inputFileType)
    fileQueue = multiprocessing.Queue()
    for file in files:
        fileQueue.put(file)
    threadsToUse = max(1, multiprocessing.cpu_count()-1)
    print("Using " + str(threadsToUse) + " worker threads.")
    processes = []
    for i in range(threadsToUse):
        p = multiprocessing.Process(target=appendFilesWorker, args=(fileQueue,outputDirectory+"temp-" + str(i) + outputFileName))
        processes.append(p)
        p.start()
    for process in processes:
        process.join()
    with open(outputDirectory + outputFileName, 'w') as outputFile:
        for i in range(threadsToUse):
            with open(outputDirectory+"temp-" + str(i) + outputFileName) as fileToAppend:
                outputFile.write(fileToAppend.read())
            os.remove(outputDirectory+"temp-" + str(i) + outputFileName)
    print('Done')
def appendFilesWorker(fileQueue, outputFileNamePath):
    with open(outputFileNamePath, 'w') as outputFile:
        while not fileQueue.empty:
            with open(fileQueue.get()) as fileToAppend:
                outputFile.write(fileToAppend.read())

if __name__ == '__main__':
    appendFilesThreaded(inputDir,outputDir)

I would expect this to successfully append files, but it crashes. It results in BrokenPipeError: [WinError 232] The pipe is being closed

Upvotes: 2

Views: 2518

Answers (1)

Troper
Troper

Reputation: 178

Found the issue: calling queue.empty is incorrect. You need parentheses (e.g. queue.empty())

I'll leave my embarrassing mistake up in case it helps others :)

Upvotes: 1

Related Questions