Reputation: 149
I am working on collecting data from several devices, and since the tests are long duration, I want to use Python's threading
and Queue
modules. I've written a short script to figure out how to use these, and it is very evident I don't understand the nuances of getting this to work.
Here is my script:
import ue9
import LJ_Util
import DAQ_Util
import threading
import Queue
from datetime import datetime
from time import sleep
queue = Queue.Queue()
now = datetime.now().isoformat()
def DAQThread(ue9ipAddr):
print '\nExecuting in DAQThread at IP Address: %s' % ue9ipAddr
a = ue9.UE9(ethernet=True, ipAddress=ue9ipAddr)
SN = (a.commConfig()).get('SerialNumber')
count = 5
while count > 0:
reading = a.feedback()
dataReturn = (SN, now, reading)
queue.put(dataReturn)
count -= 1
print count
sleep(5)
def listenThread(counter):
while queue.empty() != True:
try:
outcome = queue.get()
print outcome
counter -=1
print counter
except:
return 'queue.get() command loop failing.'
print "\nOpening device..."
ipAdd = '192.168.1.127'
feedbackThread = threading.Thread(target=DAQThread, args=(ipAdd,))
feedbackThread.start()
print "\nListening for data..."
queryThread = threading.Thread(target=listenThread, args = (10,))
queryThread.start()
print queue.get()
print(threading.activeCount())
print "\nDone"
Here is the output results from executing this script:
$ python threading-and-queue-test.py
Opening device...
Executing in DAQThread at IP Address: 192.168.1.127
Listening for data...
4
(278956853, '2012-09-03T20:02:47.656024', {'AIN4': -0.012, 'AIN5': -0.012, 'CIODir': 0, 'AIN7': -0.012, 'EIODir': 0, 'AIN1': -0.012, 'AIN2': -0.012, 'AIN3': -0.012, 'MIOState': 7, 'AIN8': -0.012, 'AIN6': -0.012, 'AIN9': -0.012, 'CIOState': 15, 'AIN0': -0.012, 'Counter0': 0, 'Counter1': 0, 'EIOState': 255, 'TimerC': 0, 'TimerB': 0, 'TimerA': 0, 'MIODir': 0, 'FIODir': 32, 'AIN14': -0.012, 'AIN15': -0.012, 'AIN12': -0.012, 'AIN13': -0.012, 'AIN10': -0.012, 'AIN11': -0.012, 'FIOState': 255})
2
Done
3
2
1
0
$
It's clear the timing of the thread activities are 'off', but I'm not sure how to fix it, as I've never programmed with these modules before, nor used threads in general. Any comments or suggestions would be welcome. Thanks in advance!
Upvotes: 3
Views: 237
Reputation: 6149
As mentioned in the comment, one problem is in your listening thread. Once you "grab" an entry from queue, there are no longer any entries in queue and since you are only adding an entry every 5 seconds, your listening thread will empty the queue thus
while queue.empty() != True
will evaluate False, exiting the loop
For example:
>>> import Queue
>>> q=Queue.Queue()
>>> q.put(1)
>>> q.empty()
False
>>> q.get()
1
>>> q.empty()
True
>>> q.empty()!=True
False
One way to get around this is to use another queue as a stop or cancel queue so modifying you listening thread you could do something like this:
stopQue=Queue.Queue()
def listenThread(counter):
while True:
if queue.empty()!=True:
outcome = queue.get()
print outcome
counter -=1
print counter
if stopQue.empty()!=True:
break
print 'Exiting Listening Thread'
That way if you put anything in the stopQue, i.e. stopQue.put(1)
, it should exit.
Complete example based on your initial code. I cut out the code that does not have to do with the Queues and threading:
import threading
import Queue
from time import sleep
dataQue = Queue.Queue()
stopQue = Queue.Queue()
def DAQThread(ue9ipAddr):
print 'Executing in DAQThread\n'
count = 5
while count > 0:
dataQue.put('data: %s' % count)
count -= 1
sleep(5)
stopQue.put(1)
print 'Exiting DAQThread\n'
def listenThread(counter):
while True:
if dataQue.empty() != True:
outcome = dataQue.get()
print outcome
counter -=1
if stopQue.empty() != True:
break
print 'Exiting Listening Thread'
print "Opening device..."
ipAdd = '192.168.1.127'
feedbackThread = threading.Thread(target=DAQThread, args=(ipAdd,))
feedbackThread.setDaemon(True)
feedbackThread.start()
print "Listening for data..."
queryThread = threading.Thread(target=listenThread, args = (10,))
queryThread.setDaemon(True)
queryThread.start()
print "Done"
Produces the output:
>>>
Opening device...
Executing in DAQThread
Listening for data...
Donedata: 5 #Notice how the script is done however the threads are still running
>>> data: 4
data: 3
data: 2
data: 1
Exiting DAQThread
Exiting Listening Thread
Upvotes: 2