Reputation: 69242
I would like to find a mechanism to easily report the progress of a Python thread. For example, if my thread had a counter, I would like to know the value of the counter once in awhile, but, importantly, I only need to know the latest value, not every value that's ever gone by.
What I imagine to be the simplest solution is a single value Queue, where every time I put
a new value on in the thread, it replaces the old value with the new one. Then when I do a get
in the main program, it would only return the latest value.
Because I don't know how to do the above, instead what I do is put
every counter value in a queue, and when I get
, I get all the values until there are no more, and just keep the last. But this seems far from ideal, in that I'm filling the queues with thousands of values the I don't care about.
Here's an example of what I do now:
from threading import Thread
from Queue import Queue, Empty
from time import sleep
N = 1000
def fast(q):
count = 0
while count<N:
sleep(.02)
count += 1
q.put(count)
def slow(q):
while 1:
sleep(5) # sleep for a long time
# read last item in queue
val = None
while 1: # read all elements of queue, only saving last
try:
val = q.get(block=False)
except Empty:
break
print val # the last element read from the queue
if val==N:
break
if __name__=="__main__":
q = Queue()
fast_thread = Thread(target=fast, args=(q,))
fast_thread.start()
slow(q)
fast_thread.join()
My question is, is there a better approach?
Upvotes: 1
Views: 547
Reputation: 11624
In your special case, you may over-complicate the issue. If your variable is just some kind of progress-indenticator of a single thread, and only this thread actually changes the variable, then it's completely safe to use a shared object to communicate the progress as long as all other threads do only read.
I guess we all read to many (rightfully) warnings about race-conditions and other pitfalls of shared states in concurrent programming, so we tend to overthink and add more precaution then is sometimes needed.
You could basically share a pre-constructed dict:
thread_progress = dict.fromkeys(list_of_threads, progress_start_value)
or manually:
thread_progress = {thread: progress_value, ...}
without further precaution as long as no thread changes the dict-keys.
This way you can track the progress of multiple threads over one dict. Only condition is to not change the dict once the threading started. Which means the dict must contain all threads BEFORE the first child-thread starts, else you must use a Lock, before writing to the dict. With "changing the dict" i mean all operation regarding the keys. You may change the associated values of a key, because that's in the next level of indirection.
Update:
The underlying problem is the shared state. Which is already a problem in linear Programs, but a nightmare in concurrent.
For example: Imagine a global (shared) variable sv
and two functions G(ood)
and B(ad)
in a linear program. Both function calculate a result depending on sv
, but B
unintentionally changes sv
. Now you are wondering why the heck G
doesn't do what it should do, despite not finding any error in your function G
, even after you tested it isolated and it was perfectly fine.
Now imagine the same scenario in a concurrent program, with two Threads A
and B
. Both Threads increment the shared state/variable sv
by one.
without locking (current value of sv
in parenthesis):
sv = 0
A reads sv (0)
B reads sv (0)
A inc sv (0)
B inc sv (0)
A writes sv (1)
B writes sv (1)
sv == 1 # should be 2!
To find the source of the problem is a pure nightmare! Because it could also succeed sometimes. More often than not A
actually would succeed to finish, before B
even starts to read sv
, but now your problem just seems to behave non-deterministic or erratic and is even harder to find. In contrast to my linear example, both threads are "good", but nevertheless behave not as intentioned.
with locking:
sv = 0
l = lock (for access on sv)
A tries to aquire lock for sv -> success (0)
B tries to aquire lock for sv -> failure, blocked by A (0)
A reads sv (0)
B blocked (0)
A inc sv (0)
B blocked (0)
A writes sv (1)
B blocked (1)
A releases lock on sv (1)
B tries to aquire lock for sv -> success (1)
...
sv == 2
I hope my little example explained the underlying problem of accessing a shared state and why making write operations (including the read operation) atomic through locking is necessary.
Regarding my advice of a pre-initialized dict: This is a mere precaution because of two reasons:
if you iterate over the threads in a for-loop, the loop may raise an exception if a thread adds or removes an entry to/from the dict while still in the loop, because it now is unclear what the next key should be.
Thread A reads the dict and gets interrupted by Thread B which adds an entry and finishes. Thread A resumes, but doesn't have the dict Thread B changed and writes the pre-B together with it's own changes back. Thread Bs changes are lost.
BTW my proposed solution wouldn't work atm, because of the immutability of the primitive types. But this could be easily fixed by making them mutable, e.g. by encapsulating them into a list or an special Progress-Object, or even simpler: give the thread-function access to the thread_progress
dict .
Explanation by example:
t = Thread()
progress = 0 # progress points to the object `0`
dict[t] = progress # dict[t] points now to object `0`
progress = 1 # progress points to object `1`
dict[t] # dict[t] still points to object `0`
better:
t = Thread()
t.progress = 0
dict[thread_id] = t
t.progress = 1
dict[thread_id].progress == 1
Upvotes: 3
Reputation: 880957
Just use a global variable and a threading.Lock
to protect it during assignments:
import threading
from time import sleep
N = 1000
value = 0
def fast(lock):
global value
count = 0
while count<N:
sleep(.02)
count += 1
with lock:
value = count
def slow():
while 1:
sleep(5) # sleep for a long time
print value # read current value
if value == N:
break
if __name__=="__main__":
lock = threading.Lock()
fast_thread = threading.Thread(target=fast, args=(lock,))
fast_thread.start()
slow()
fast_thread.join()
yields (something like)
249
498
747
997
1000
As Don Question points out, if there is only one thread modifying value
, then
actually no lock is needed in the fast
function. And as dano points out, if you want to
ensure that the value printed in slow
is the same value used in the
if-statement
, then a lock is needed in the slow
function.
For more on when locks are needed, see Thread Synchronization Mechanisms in Python.
Upvotes: 3
Reputation: 29740
Just use a deque with a maximum length of 1. It will just keep your latest value.
So, instead of:
q = Queue()
use:
from collections import deque
q = deque(maxlen=1)
To read from the deque, there's no get
method, so you'll have to do something like:
val = None
try:
val = q[0]
except IndexError:
pass
Upvotes: 3