Reputation: 145
I have a function that reads in files, groups them with some data and then puts them on a message queue to be sent over a socket by a thread:
for filename in os.listdir(os.getcwd()):
... read in files and sort numerically ...
roke_data = {"filename" : None, "byte_string" : None}
for filename in lst:
with open(filename, "r") as f:
roke_data["filename"] = filename
roke_data["byte_string"] = f.read()
fileQueue.put(roke_data)
fileQueue.join()
exit()
My thread's run function:
def run(self):
try:
self.connect_socket() #connects to a socket on localhost
roke_data = self.fileQueue.get_nowait()
print "Sending file: ", roke_data["filename"]
self.sc.send(roke_data["byte_string"])
except Queue.Empty:
time.sleep(0.1)
I've run tests to confirm that the fileQueue
is being filled and it seems to be getting filled with the correct number of elements, but it seems that only a few elements are actually pulled off of the queue and sent and whats worse is that some files are being repeated in the queue. For example, the last file in the set that I'm reading in seems to be pushed to the queue multiple times. What am I doing wrong?
Upvotes: 0
Views: 58
Reputation: 1688
In the first place I would put a new dict object for each file onto the queue, rather than reusing the same dict instance over and over. Also, you need to signal that no more items will be put onto the queue, e.g. by putting None
:
for filename in lst:
with open(filename, "r") as f:
roke_data = dict(filename=filename, byte_string=f.read())
self.fileQueue.put(roke_data)
self.fileQueue.join()
self.fileQueue.put(None) # the kill pill
Second, I can't see any loop in your consumer thread method. You are just getting a single item. Is your code snippet missing something? Have a look at this example to see how elements can be consumed from a Queue.
It breaks down to a loop and a blocking call to get()
:
def run(self):
self.connect_socket() #connects to a socket on localhost
while True:
roke_data = self.fileQueue.get() # blocking!
if roke_data is None:
break
print("Sending file: ", roke_data["filename"])
self.sc.send(roke_data["byte_string"])
# Assuming it's a JoinableQueue
self.fileQueue.task_done()
Upvotes: 1