ckhung
ckhung

Reputation: 33

two condition variables attached to the same lock, different behaviors in python2 and python3

I am trying to write the classical producer-consumer program in python. Here are the c code that I referenced: http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture16.html https://web.stanford.edu/~ouster/cgi-bin/cs140-spring14/lecture.php?topic=locks

After pip install colored and pip3 install colored I run this program on lubuntu 18.04. When running as "python3 producer-consumer.py" (i.e. running with python 3.6.7) the program hangs after a few iterations either at

"queue is empty, stop consuming"

or at

"queue is full, stop producing"

Note: ctrl-c won't kill the program. You need to press ctrl-z then kill -9 %1 to kill it.

The strange thing is: when running as "python producer-consumer.py" (i.e. running with python 2.7.15rc1) it almost runs as expected. But after running for a sufficiently long time, it raises an IndexError exception either at

queue.append(item)

or at

item = queue.pop(0)

Before that, it runs as expected for quite a few minutes : 3 producers and 3 consumers of various colors working on the same queue of small capacity, frequently bumping into the empty queue case and the full queue case.

I suspect that regardless of whether my program is correct or not, the different behaviors in python2 and python3 seems to suggest that there is a bug in python3's (and maybe python2's, too) implementation of the condition variable? Or is this difference actually expected for certain buggy programs? Thanks in advance.

from threading import Thread, Lock, Condition
import time
from random import random, randint
import colored
from colored import stylize

queue = []
CAPACITY = 3

qlock = Lock()
item_ok = Condition(qlock)
space_ok = Condition(qlock)

class ProducerThread(Thread):
    def run(self):
        global queue
        mycolor = self.name
        while True:
            qlock.acquire()
            if len(queue) >= CAPACITY:
                print(stylize('queue is full, stop producing', colored.fg(mycolor)))
                while space_ok.wait():
                    pass
                print(stylize('space available again, start producing', colored.fg(mycolor)))
            item = chr(ord('A')+randint(0,25))
            print(stylize('['+' '.join(queue)+'] <= '+item, colored.fg( mycolor)))
            queue.append(item)
            item_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)


class ConsumerThread(Thread):
    def run(self):
        global queue
        mycolor = self.name
        while True:
            qlock.acquire()
            if not queue:
                print(stylize('queue is empty, stop consuming', colored.fg(mycolor)))
                while item_ok.wait():
                    pass
                print(stylize('food is available, start consuming', colored.fg(mycolor)))
            item = queue.pop(0)
            print(stylize(item+' <= ['+' '.join(queue)+']', colored.fg( mycolor)))
            space_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)

ProducerThread(name='red').start()
ProducerThread(name='green').start()
ProducerThread(name='blue').start()
ConsumerThread(name='cyan').start()
ConsumerThread(name='magenta').start()
ConsumerThread(name='yellow').start()

after a few minutes of running python2

Upvotes: 1

Views: 997

Answers (1)

Felix
Felix

Reputation: 6359

The main issue is your code is that you don't check that the list isn't empty / full after a thread has been notified. This can be a problem in the following situation:

c1 and c2 are consumer threads, p1 is a producer thread. The queue is empty at the beginning. c1 is awake (currently in the last line time.sleep...) while c2 is waiting to be notified (in line while item_ok.wait():.

  1. p1 adds an item to the queue and calls item_ok.notify()
  2. c1 finishes waiting and acquires the lock
  3. c2 gets notified and tries to get the lock
  4. c1 consumes the item from the queue and releases the lock
  5. c2 acquires the lock and tries to pop from an empty queue

The solution

Instead of calling .wait() in the while condition (which is non-sensical because it always returns None on Python 2 and always True on Python 3.2+, see here), call .wait() in the while loop body and put the condition whether the queue isn't full / empty in the while loop condition:

while not queue:
    print('queue is empty, stop consuming')
    item_ok.wait()
    print('trying again')

By using this approach (which is also used in the docs linked above), a thread checks whether the queue is still not empty / full after it has been woken up and has acquired the lock. If the condition isn't fulfilled anymore (because another thread was executed in between), the thread waits on the condition again.

By the way, the difference between python 2 and 3 described above is also the reason why your program behaves differently on the two versions. That's documented behavior and not a bug in the implementation.

The fixed code (that has been running fine on my machine within the last 30 minutes) for the producer and consumer threads looks like this (I removed the colors because I didn't want to install a package):

class ProducerThread(Thread):
    def run(self):
        global queue
        while True:
            qlock.acquire()
            while len(queue) >= CAPACITY:
                print('queue is full, stop producing')
                space_ok.wait()
                print('trying again')
            item = chr(ord('A')+randint(0,25))
            print('['+' '.join(queue)+'] <= '+item)
            queue.append(item)
            item_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            qlock.acquire()
            while not queue:
                print('queue is empty, stop consuming')
                item_ok.wait()
                print('trying again')
            item = queue.pop(0)
            print(item+' <= ['+' '.join(queue)+']')
            space_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)

Bonus

You mentioned that the program can't be exited using Ctrl-C (KeyboardInterrupt). To fix this, you can make the threads "daemons" which means that they exit as soon as the main thread ends. Using the code above, Ctrl-C works fine to end the program:

ProducerThread(name='red', daemon=True).start()
ProducerThread(name='green', daemon=True).start()
ProducerThread(name='blue', daemon=True).start()
ConsumerThread(name='cyan', daemon=True).start()
ConsumerThread(name='magenta', daemon=True).start()
ConsumerThread(name='yellow', daemon=True).start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Exiting")

Does this solve your problem? Please comment below.

Upvotes: 1

Related Questions