leo
leo

Reputation: 33

How to use pipe correctly in multiple processes(>2)

How to use pipe correctly in multiple processes(>2)?

eg. one producer several consumer

these code is failure in Linux environment but windows environment is well

import multiprocessing, time

def consumer(pipe,id):
    output_p, input_p = pipe
    input_p.close()                    
    while True:
        try:
            item = output_p.recv()
        except EOFError:
            break
        print("%s consume:%s" % (id,item))
        #time.sleep(3)      # if no sleep  these code will fault in Linux environment
                            # but windows environment is well
    print('Consumer done')

def producer(sequence, input_p):
    for item in sequence:
        print('produce:',item)
        input_p.send(item) 
        time.sleep(1)

if __name__ == '__main__':
    (output_p, input_p) = multiprocessing.Pipe()

    # create two consumer process
    cons_p1 = multiprocessing.Process(target=consumer,args=((output_p,input_p),1)) 
    cons_p1.start() 
    cons_p2 = multiprocessing.Process(target=consumer,args=((output_p,input_p),2))
    cons_p2.start() 

    output_p.close()

    sequence = [i for i in range(10)]
    producer(sequence, input_p)
    input_p.close()

    cons_p1.join()
    cons_p2.join()

Upvotes: 1

Views: 2618

Answers (1)

The Pjot
The Pjot

Reputation: 1859

Do not use pipe for multiple consumers. The documentation explicitly says it will be corrupted when more then two processes read or write. Which you do; two readers.

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

So use Queue, or JoinableQueue even.

from multiprocessing import Process, JoinableQueue
from Queue import Empty
import time


def consumer(que, pid):
    while True:
        try:
            item = que.get(timeout=10)
            print("%s consume:%s" % (pid, item))
            que.task_done()
        except Empty:
            break
    print('Consumer done')


def producer(sequence, que):
    for item in sequence:
        print('produce:', item)
        que.put(item) 
        time.sleep(1)

if __name__ == '__main__':
    que = JoinableQueue()

    # create two consumer process
    cons_p1 = Process(target=consumer, args=(que, 1)) 
    cons_p1.start() 
    cons_p2 = Process(target=consumer, args=(que, 2))
    cons_p2.start() 

    sequence = [i for i in range(10)]
    producer(sequence, que)
    que.join()
    cons_p1.join()
    cons_p2.join()

Upvotes: 3

Related Questions