Stefan Bossbaly
Stefan Bossbaly

Reputation: 6794

Python Locking Critical Section

I am trying to use the multiprocessing library in Python to process "tests" concurrently. I have a list of tests stored in the variable test_files. I want to workers to remove a test from test_files and call the process_test function of them. However when I run this code, both processes run the same test. It seems that I am not accessing test_files in a thread safe manner. What am I doing wrong?

Code

def process_worker(lock, test_files)
    # Keep going until we run out of tests
    while True:
        test_file = None
        # Critical section of code
        lock.acquire()
        try:
            if len(test_files) != 0:
                test_file = test_files.pop()
        finally:
            lock.release()
        # End critical section of code

        # If there is another test in the queue process it
        if test_file is not None:
            print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name)
            process_test(test_file)
        else:
            # No more tests to process
            return

# Mutex for workers
lock = multiprocessing.Lock()

# Declare our workers
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files))
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files))

# Start processing
p1.start()
p2.start()

# Block until both workers finish
p1.join()
p2.join()

Output

Running test "BIT_Test" on worker Process 1
Running test "BIT_Test" on worker Process 2

Upvotes: 1

Views: 4853

Answers (2)

Paul Rooney
Paul Rooney

Reputation: 21609

You could also use multiprocessing.Manager

import multiprocessing

def process_worker(lock, test_files):
    # Keep going until we run out of tests
    while True:
        test_file = None
        # Critical section of code
        lock.acquire()
        try:
            if len(test_files) != 0:
                test_file = test_files.pop()
        finally:
            lock.release()
        # End critical section of code

        # If there is another test in the queue process it
        if test_file is not None:
            print "Running test %s on worker %s" % (test_file, multiprocessing.current_process().name)
            #process_test(test_file)
        else:
            # No more tests to process
            return

# Mutex for workers
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()

test_files = manager.list(['f1', 'f2', 'f3'])

# Declare our workers
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files))
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files))

# Start processing
p1.start()
p2.start()

# Block until both workers finish
p1.join()
p2.join()

Upvotes: 3

dano
dano

Reputation: 94881

Trying to share a list like this not the right approach here. You should use a process-safe data structure, like multiprocessing.Queue, or better yet, use a multiprocessing.Pool and let it handle the queuing for you. What you're doing is perfectly suited for Pool.map:

import multiprocessing

def process_worker(test_file):
    print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name)
    process_test(test_file)


p = multiprocessing.Pool(2) # 2 processes in the pool
# map puts each item from test_files in a Queue, lets the
# two processes in our pool pull each item from the Queue,
# and then execute process_worker with that item as an argument.
p.map(process_worker, test_files)
p.close()
p.join()

Much simpler!

Upvotes: 4

Related Questions