Xu Siyuan
Xu Siyuan

Reputation: 37

with multiprocessing, why sequential execution

I am following some examples online to learn how to program in parallel, i.e., how to use multiprocessing.

I am running on windows 10, with spyder 3.3.6, python 3.7.

import os
import time
from multiprocessing import Process, Queue

def square(numbers, queue):
    print("started square")
    for i in numbers:
        queue.put(i*i)
        print(i*i)
    print(f"{os.getpid()}")

def cube(numbers, queue):
    print("started cube")
    for i in numbers:
        queue.put(i*i*i)
        print(i*i*i)
    print(f"{os.getpid()}")

if __name__ == '__main__':

    numbers = range(5)
    queue = Queue()

    square_process = Process(target=square, args=(numbers,queue))
    cube_process = Process(target=cube, args=(numbers,queue))

    square_process.start()
    cube_process.start()

    square_process.join()
    cube_process.join()

    print("Already joined")
    while not queue.empty():
        print(queue.get())

I expect the output of queue to be mixed or uncertain as it depends on how fast a process is started or how fast the first process finishes all the statements? Theoretically, we can get something like 0, 1, 4, 8, 9, 27, 16, 64. But the actual output is sequential like below
0 1 4 9 16 0 1 8 27 64

Upvotes: 0

Views: 2050

Answers (4)

prithajnath
prithajnath

Reputation: 2115

The processes themselves are not doing anything CPU heavy or network bound so they take pretty negligible amount of time to execute. My guess would be that by the time the second process is started, the first one is already finished. Processes are parallel by nature, but since your tasks are so menial it gives the illusion that they are being run sequentially. You can introduce some randomness into your script to see the parallelism in action,

import os
from multiprocessing import Process, Queue
from random import randint

from time import sleep

def square(numbers, queue):

    print("started square")

    for i in numbers:
        if randint(0,1000)%2==0:
            sleep(3)
        queue.put(i*i)
        print(i*i)
    print(f"square PID : {os.getpid()}")

def cube(numbers, queue):

    print("started cube")
    for i in numbers:
        if randint(0,1000)%2==0:
            sleep(3)
        queue.put(i*i*i)
        print(i*i*i)
    print(f"cube PID : {os.getpid()}")

if __name__ == '__main__':

    numbers = range(5)
    queue = Queue()

    square_process = Process(target=square, args=(numbers,queue))
    cube_process = Process(target=cube, args=(numbers,queue))

    square_process.start()
    cube_process.start()

    square_process.join()
    cube_process.join()

    print("Already joined")
    while not queue.empty():
        print(queue.get())

Here the two processes randomly pause their execution, so when one process is paused the other one gets a chance to add a number to the queue (multiprocessing.Queue is thread and process safe). If you run this script a couple of times you'll see that the order of items in the queue are not always the same

Upvotes: 0

Slavik Greshilov
Slavik Greshilov

Reputation: 1

Looks like MisterMiyagi is right. Start additional python process is much more expensive, than calculating squares from 0 to 4 :) I've created version of code with lock primitive and now we sure that processes started simultaneously.

import os
from multiprocessing import Process, Queue, Lock


def square(numbers, queue, lock):

    print("started square")
    # Block here, until lock release
    lock.acquire()
    for i in numbers:
        queue.put(i*i)
    print(f"{os.getpid()}")

def cube(numbers, queue, lock):
    # Finally release lock
    lock.release()
    print("started cube")
    for i in numbers:
        queue.put(i*i*i)
    print(f"{os.getpid()}")

if __name__ == '__main__':

    numbers = range(5)
    queue = Queue()
    lock = Lock()
    # Activate lock
    lock.acquire()
    square_process = Process(target=square, args=(numbers,queue,lock))
    cube_process = Process(target=cube, args=(numbers,queue,lock))

    square_process.start()
    cube_process.start()

    cube_process.join()
    square_process.join()

    print("Already joined")
    while not queue.empty():
        print(queue.get())

My output is:

0
0
1
4
1
9
8
16
27
64

Upvotes: 0

Sam Mason
Sam Mason

Reputation: 16174

pretty sure this is just because spinning up a process takes some time, so they tend to run after each other

I rewrote it to make jobs have a better chance of running in parallel:

from multiprocessing import Process, Queue
from time import time, sleep


def fn(queue, offset, start_time):
    sleep(start_time - time())
    for i in range(10):
        queue.put(offset + i)


if __name__ == '__main__':
    queue = Queue()
    start_time = time() + 0.1

    procs = []
    for i in range(2):
        args = (queue, i * 10, start_time)
        procs.append(Process(target=fn, args=args))

    for p in procs: p.start()
    for p in procs: p.join()

    while not queue.empty():
        print(queue.get())

I should note that I get nondeterministic ordering of output as you seemed to be expecting. I'm under Linux so you might get something different under Windows, but I think it's unlikely

Upvotes: 0

Gro
Gro

Reputation: 1683

There are few things to understand here

  • Two processes are executing square and cube functions independently. Within the functions they will maintain the order as it is governed by for loop.
  • The only part that is going to be random at a point in time is - 'which process is executing and adding what to queue'. So it may be that square process is in its 5th iteration (i = 4) while cube process is in its 2nd iteration (i = 1).
  • You are using a single instance of Queue to add items from two processes that are executing square and cube functions separately. Queues are first in first out (FIFO) so when you get from Queue (& print in the main thread) it will maintain the order in which it has received the items.

Execute following updated version of your program, to better understand

import os
import time
from multiprocessing import Process, Queue

def square(numbers, queue):
    print("started square process id is %s"%os.getpid())    
    for i in numbers:
        queue.put("Square of %s is %s "%(i, i*i))        
        print("square: added %s in queue:"%i)    

def cube(numbers, queue):    
    print("started cube process id is %s"%os.getpid())    
    for i in numbers:
        queue.put("Cube of %s is %s "%(i, i*i*i))
        print("cube: added %s in queue:"%i)


if __name__ == '__main__':

    numbers = range(15)
    queue = Queue()

    square_process = Process(target=square, args=(numbers,queue))
    cube_process = Process(target=cube, args=(numbers,queue))

    square_process.start()
    cube_process.start()

    square_process.join()
    cube_process.join()

    print("Already joined")
    while not queue.empty():
        print(queue.get())

Upvotes: 1

Related Questions