Red Spanner
Red Spanner

Reputation: 149

Correct usage of multithreading and Queue module in data collection application written in Python

I am working on collecting data from several devices, and since the tests are long duration, I want to use Python's threading and Queue modules. I've written a short script to figure out how to use these, and it is very evident I don't understand the nuances of getting this to work.

Here is my script:

import ue9
import LJ_Util
import DAQ_Util
import threading
import Queue

from datetime import datetime
from time import sleep

queue = Queue.Queue()
now = datetime.now().isoformat()

def DAQThread(ue9ipAddr):
    print '\nExecuting in DAQThread at IP Address: %s' % ue9ipAddr
    a = ue9.UE9(ethernet=True, ipAddress=ue9ipAddr)
    SN = (a.commConfig()).get('SerialNumber')
    count = 5
    while count > 0: 
        reading = a.feedback()
        dataReturn = (SN, now, reading)
        queue.put(dataReturn)
        count -= 1
        print count
        sleep(5)


def listenThread(counter): 
    while queue.empty() != True:
        try: 
            outcome = queue.get() 
            print outcome 
            counter -=1 
            print counter 
        except: 
            return 'queue.get() command loop failing.' 


print "\nOpening device..."
ipAdd = '192.168.1.127'
feedbackThread = threading.Thread(target=DAQThread, args=(ipAdd,))
feedbackThread.start()

print "\nListening for data..."
queryThread = threading.Thread(target=listenThread, args = (10,))
queryThread.start()

print queue.get()
print(threading.activeCount())
print "\nDone"

Here is the output results from executing this script:

$ python threading-and-queue-test.py

Opening device...

Executing in DAQThread at IP Address: 192.168.1.127
Listening for data...

4
 (278956853, '2012-09-03T20:02:47.656024', {'AIN4': -0.012, 'AIN5': -0.012, 'CIODir': 0, 'AIN7': -0.012, 'EIODir': 0, 'AIN1': -0.012, 'AIN2': -0.012, 'AIN3': -0.012, 'MIOState': 7, 'AIN8': -0.012, 'AIN6': -0.012, 'AIN9': -0.012, 'CIOState': 15, 'AIN0': -0.012, 'Counter0': 0, 'Counter1': 0, 'EIOState': 255, 'TimerC': 0, 'TimerB': 0, 'TimerA': 0, 'MIODir': 0, 'FIODir': 32, 'AIN14': -0.012, 'AIN15': -0.012, 'AIN12': -0.012, 'AIN13': -0.012, 'AIN10': -0.012, 'AIN11': -0.012, 'FIOState': 255})
2

Done
3
2
1
0
$

It's clear the timing of the thread activities are 'off', but I'm not sure how to fix it, as I've never programmed with these modules before, nor used threads in general. Any comments or suggestions would be welcome. Thanks in advance!

Upvotes: 3

Views: 237

Answers (1)

Onlyjus
Onlyjus

Reputation: 6149

As mentioned in the comment, one problem is in your listening thread. Once you "grab" an entry from queue, there are no longer any entries in queue and since you are only adding an entry every 5 seconds, your listening thread will empty the queue thus

while queue.empty() != True will evaluate False, exiting the loop

For example:

>>> import Queue
>>> q=Queue.Queue()
>>> q.put(1)
>>> q.empty()
False
>>> q.get()
1
>>> q.empty()
True
>>> q.empty()!=True
False

One way to get around this is to use another queue as a stop or cancel queue so modifying you listening thread you could do something like this:

stopQue=Queue.Queue()

def listenThread(counter): 
    while True:
        if queue.empty()!=True: 
            outcome = queue.get() 
            print outcome 
            counter -=1 
            print counter 
        if stopQue.empty()!=True:
            break
    print 'Exiting Listening Thread'

That way if you put anything in the stopQue, i.e. stopQue.put(1), it should exit.

Complete example based on your initial code. I cut out the code that does not have to do with the Queues and threading:

import threading
import Queue
from time import sleep

dataQue = Queue.Queue()
stopQue = Queue.Queue()

def DAQThread(ue9ipAddr):
    print 'Executing in DAQThread\n'
    count = 5
    while count > 0: 
        dataQue.put('data: %s' % count)
        count -= 1
        sleep(5)
    stopQue.put(1)
    print 'Exiting DAQThread\n'


def listenThread(counter): 
    while True:
        if dataQue.empty() != True:
            outcome = dataQue.get() 
            print outcome 
            counter -=1 
        if stopQue.empty() != True:
            break
    print 'Exiting Listening Thread'


print "Opening device..."
ipAdd = '192.168.1.127'
feedbackThread = threading.Thread(target=DAQThread, args=(ipAdd,))
feedbackThread.setDaemon(True)
feedbackThread.start()

print "Listening for data..."
queryThread = threading.Thread(target=listenThread, args = (10,))
queryThread.setDaemon(True)
queryThread.start()

print "Done"

Produces the output:

>>> 
Opening device...
Executing in DAQThread
Listening for data...

Donedata: 5        #Notice how the script is done however the threads are still running

>>> data: 4
data: 3
data: 2
data: 1
Exiting DAQThread
Exiting Listening Thread

Upvotes: 2

Related Questions