user1179317
user1179317

Reputation: 2903

Use generator to iterate through data from a multiprocess

I would like to perform the following below using multiprocess, instead of subprocess.Popen. This is because I cannot pass objects using popen. I know my simple example below does not use/pass objects, but that is what I want to do.

Sample code is:

main.py

import subprocess


class ProcReader():
    def __init__(self, python_file):
        self.proc = subprocess.Popen(['python', python_file], stdout=subprocess.PIPE)

    def __iter__(self):
        return self

    def __next__(self):
        while True:
            line = self.proc.stdout.readline()
            if not line:
                raise StopIteration
            return line


if __name__ == "__main__":
    r1 = ProcReader("test1.py")
    r2 = ProcReader("test2.py")
    r3 = ProcReader("test3.py")

    for l1, l2, l3 in zip(r1, r2, r3):
        d1 = l1.decode('utf-8').strip().split(",")
        d2 = l2.decode('utf-8').strip().split(",")
        d3 = l3.decode('utf-8').strip().split(",")
        print(f"{d1[0]}:{d1[1]},{d2[0]}:{d2[1]},{d3[1]}:{d3[1]}")

test#.py

for x in range(10):
    print("test1,{}".format(x))

My sample code is in python3, but I would like an equivalent, using multiprocess, in python2.7. Should the equivalent also read from stdout? Or should it utilize the queue and just have a worker reading from the queue?

Update---------

My example using multiprocessing:

import time
from multiprocessing import Process, Queue


def writer1(queue):
    for x in range(10):
        time.sleep(1)
        queue.put("test1,{}".format(x))

def writer2(queue):
    for x in range(10):
        time.sleep(2)
        queue.put("test2,{}".format(x))

def writer3(queue):
    for x in range(10):
        queue.put("test3,{}".format(x))

if __name__=='__main__':
    q1 = Queue()
    q2 = Queue()
    q3 = Queue()

    writer_1 = Process(target=writer1, args=((q1),))
    writer_1.daemon = True
    writer_1.start()

    writer_2 = Process(target=writer2, args=((q2),))
    writer_2.daemon = True
    writer_2.start()

    writer_3 = Process(target=writer3, args=((q3),))
    writer_3.daemon = True
    writer_3.start()

    while True:
        msg1 = q1.get()
        msg2 = q2.get()
        msg3 = q3.get()
        if msg1 and msg2 and msg3:
            d1 = msg1.strip().split(",")
            d2 = msg2.strip().split(",")
            d3 = msg3.strip().split(",")
            print("{}:{},{}:{},{}:{}".format(d1[0],d1[1],
                                             d2[0],d2[1],
                                             d3[0],d3[1]))
        else:
            break

Didnt realize q1.get() waits until something is there, I added sleep to verify this. Also, how do I check that the process is done writing? Seems to be just waiting at the end

Upvotes: 0

Views: 104

Answers (1)

AKX
AKX

Reputation: 168967

To adapt your second example for my comment about sentinel objects, maybe you're looking for something like

import os
import time
from multiprocessing import Process, Queue


def writer(queue):
    value = os.getpid()
    for x in range(10):
        time.sleep(0.1)
        queue.put("{},{}".format(value, x))
    queue.put(None)


def spawn_process():
    q = Queue()
    p = Process(target=writer, args=(q,))
    p.daemon = True
    p.start()
    return (p, q)


if __name__ == "__main__":
    processes_and_queues = [spawn_process() for x in range(3)]
    processes, queues = zip(*processes_and_queues)
    live_queues = list(queues)

    while live_queues:
        messages = []
        for queue in live_queues:
            message = queue.get()
            if message is None:
                live_queues.remove(queue)
            messages.append(message)
        if len(messages) == len(processes):
            print(messages)

It outputs (e.g.)

['51748,0', '51749,0', '51750,0']
['51748,1', '51749,1', '51750,1']
['51748,2', '51749,2', '51750,2']
['51748,3', '51749,3', '51750,3']
['51748,4', '51749,4', '51750,4']
['51748,5', '51749,5', '51750,5']
['51748,6', '51749,6', '51750,6']
['51748,7', '51749,7', '51750,7']
['51748,8', '51749,8', '51750,8']
['51748,9', '51749,9', '51750,9']

Upvotes: 1

Related Questions