Kasravnd
Kasravnd

Reputation: 107287

multithreading check membership in Queue and stop the threads

I want to iterate over a list using 2 thread. One from leading and other from trailing, and put the elements in a Queue on each iteration. But before putting the value in Queue I need to check for existence of the value within Queue (its when that one of the threads has putted that value in Queue), So when this happens I need to stop the thread and return list of traversed values for each thread.

This is what I have tried so far :

from Queue import Queue
from threading import Thread, Event

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

main_path = Queue()

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

def a(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'a'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

def b(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'b'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

g=['a','b','c','d','e','f','g','h','i','j','k','l']

t1 = ThreadWithReturnValue(target=a, args=(main_path,g))
t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1]))
t2.start()
t1.start()
# Wait for all produced items to be consumed
print main_path.join()

I used ThreadWithReturnValue that will create a custom thread that returns the value.

And for membership checking I used the following function :

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

Now if I first start the t1 and then the t2 I will get 12 a then one b then it doesn't do any thing and I need to terminate the python manually!

But if I first run the t2 then t1 I will get the following result:

b
b
b
b
 ab

ab
b

b
b
 b
a
a

So my questions is that why python treads different in this cases? and how can I terminate the threads and make them communicate with each other?

Upvotes: 3

Views: 2986

Answers (2)

abarnert
abarnert

Reputation: 365707

Before we get into bigger problems, you're not using Queue.join right.

The whole point of this function is that a producer who adds a bunch of items to a queue can wait until the consumer or consumers have finished working on all of those items. This works by having the consumer call task_done after they finish working on each item that they pulled off with get. Once there have been as many task_done calls as put calls, the queue is done. You're not doing a get anywhere, much less a task_done, so there's no way the queue can ever be finished. So, that's why you block forever after the two threads finish.


The first problem here is that your threads are doing almost no work outside of the actual synchronization. If the only thing they do is fight over a queue, only one of them is going to be able to run at a time.

Of course that's common in toy problems, but you have to think through your real problem:

  • If you're doing a lot of I/O work (listening on sockets, waiting for user input, etc.), threads work great.
  • If you're doing a lot of CPU work (calculating primes), threads don't work in Python because of the GIL, but processes do.
  • If you're actually primarily dealing with synchronizing separate tasks, neither one is going to work well (and processes will be worse). It may still be simpler to think in terms of threads, but it'll be the slowest way to do things. You may want to look into coroutines; Greg Ewing has a great demonstration of how to use yield from to use coroutines to build things like schedulers or many-actor simulations.

Next, as I alluded to in your previous question, making threads (or processes) work efficiently with shared state requires holding locks for as short a time as possible.

So, if you have to search a whole queue under a lock, that had better be a constant-time search, not a linear-time search. That's why I suggested using something like an OrderedSet recipe rather than a list, like the one inside the stdlib's Queue.Queue. Then this function:

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

… is only blocking the queue for a tiny fraction of a second—just long enough to look up a hash value in a table, instead of long enough to compare every element in the queue against x.


Finally, I tried to explain about race conditions on your other question, but let me try again.

You need a lock around every complete "transaction" in your code, not just around the individual operations.

For example, if you do this:

with queue locked:
    see if x is in the queue
if x was not in the queue:
    with queue locked:
        add x to the queue

… then it's always possible that x was not in the queue when you checked, but in the time between when you unlocked it and relocked it, someone added it. This is exactly why it's possible for both threads to stop early.

To fix this, you need to put a lock around the whole thing:

with queue locked:
    if x is not in the queue:
        add x to the queue

Of course this goes directly against what I said before about locking the queue for as short a time as possible. Really, that's what makes multithreading hard in a nutshell. It's easy to write safe code that just locks everything for as long as might conceivably be necessary, but then your code ends up only using a single core, while all the other threads are blocked waiting for the lock. And it's easy to write fast code that just locks everything as briefly as possible, but then it's unsafe and you get garbage values or even crashes all over the place. Figuring out what needs to be a transaction, and how to minimize the work inside those transactions, and how to deal with the multiple locks you'll probably need to make that work without deadlocking them… that's not so easy.

Upvotes: 3

Ami Tavory
Ami Tavory

Reputation: 76297

A couple of things that I think can be improved:

  1. Due to the GIL, you might want to use the multiprocessing (rather than threading) module. In general, CPython threading will not cause CPU intensive work to speed up. (Depending on what exactly is the context of your question, it's also possible that multiprocessing won't, but threading almost certainly won't.)
  2. A function like your is_inqueue would likely lead to high contention.

The locked time seems linear in the number of items that need to be traversed:

def is_in_queue(x, q):
    with q.mutex:
        return x in q.queue

So, instead, you could possibly do the following.

Use multiprocessing with a shared dict:

 from multiprocessing import Process, Manager

 manager = Manager()
 d = manager.dict()

 # Fn definitions and such

 p1 = Process(target=p1, args=(d,))
 p2 = Process(target=p2, args=(d,))

within each function, check for the item like this:

def p1(d):

    # Stuff

    if 'foo' in d:
        return 

Upvotes: 2

Related Questions