Reputation: 2392
I want to iterate through dictionaries in a dictionary (simulating the structure of a directory or a website) using multithreading and Queue(to limit the number of threads) in Python. I created mainDict to simulate this
mainDict = {"Layer1": {"Layer11": 1, "Layer12": 1, "Layer13": 1, "Layer14": 1, "Layer15": 1," Layer16": 1},
"Layer2": {"Layer21": 2, "Layer22": 2, "Layer23": 2, "Layer24": 2, "Layer25": 2, "Layer26": 2},
"Layer3": {"Layer31": 4, "Layer32": 4, "Layer33": 4, "Layer34": 4, "Layer35": 4, "Layer36": 4},
"Layer4": {"Layer41": 8, "Layer42": 8, "Layer43": 8, "Layer44": 8, "Layer45": 8, "Layer46": 8},
"Layer5": {"Layer51": 16, "Layer52": 16, "Layer53": 16, "Layer54": 16, "Layer55": 16, "Layer56": 16},
"Layer6": {"Layer61": 32, "Layer62": 32, "Layer63": 32, "Layer64": 32, "Layer65": 32, "Layer66": 32}}
and a Crawler class to instantiate a crawler for each first subdictionary of mainDict.
The idea is that I want to create 2 threads (a limited number of threads/crawlers at a time to reduce CPU usage) that can crawl to Layer(i) (i=1..6). Each thread will crawl until it reaches the leaves of the "tree"than moves to the next dictionary (e.g. crawler 0 will go through Layer1 and crawler 1 will go through Layer2, after finishing go through Layer3...).
class Crawler:
def __init__(self, rootDict, number_of_delay, crawler):
self.crawler = crawler
self.rootDict = rootDict
self.number_of_delay = number_of_delay
def crawlAllLeaves(self, myDict):
for k, v in myDict.items():
if isinstance(v, dict):
print("Crawler {} is crawling {}".format(self.crawler, k))
self.crawlAllLeaves(v)
else:
print("Crawler {} reached the value {} for key {}".format(self.crawler, v, k))
time.sleep(self.number_of_delay + v)
def someAuxFunc():
#to simulate some loading time
time.sleep(2)
def createWorker(q, delayNumber, crawler):
tc = Crawler(mainDict[q.get()], delayNumber, crawler)
tc.crawlAllLeaves(tc.rootDict)
def threader(q, delayNumber, crawler):
while True:
print("crawler {}: has gotten the url {}".format(crawler, q.get()))
createWorker(q, delayNumber, crawler)
print("crawler {}: has finished the url {}".format(crawler, q.get()))
q.task_done()
q = Queue()
number_of_threads = 2
delayNumber = 2
for thread in range(number_of_threads):
th = threading.Thread(target=threader, args=(q, delayNumber, thread,))
th.setDaemon(True)
th.start()
for key, value in mainDict.items():
someAuxFunc()
print("QUEING {}".format(key))
q.put(key)
q.join()
I have 2 problems:
Can you help me with this one as I'd like to learn Python and threading and I don't know what I am doing wrong?
Upvotes: 1
Views: 1190
Reputation: 12205
Your problem is in handling of the queue, and it explains both of your issues. You keep reading from the queue instead of using the value you actually received from there. Take a look at this (fixed) code:
def createWorker(bar, delayNumber, crawler):
tc = Crawler(mainDict[bar], delayNumber, crawler)
tc.crawlAllLeaves(tc.rootDict)
def threader(q, delayNumber, crawler):
while True:
foo = q.get()
print("crawler {}: has gotten the url {}".format(crawler, foo))
createWorker(foo, delayNumber, crawler)
print("crawler {}: has finished the url {}".format(crawler, foo))
q.task_done()
In your threader
we read now the queue once to a variable and then pass this variable to createWorker
. In your createWorker
you use this value instead of getting another.
Your original code initially gets a value from the queue in your first print statement. It prints the value and then discards. Then you call createWorker
, where you receive the next value from the queue and start work on that. Finally the second print statement gets yet another value from the queue and prints that. None of the values shown in print statements are actually passed to createWorker
.
Queue.get()
blocks by default if there is nothing there. As you get three values for every one processed, your result is whatever it is but definitely not what you intend it to be. Your code blocks in the final q.join()
as you have used get()
three times to get a value from the queue but used task_done
only once. Thus your join blocks as it assumes there are still tasks in progress.
Upvotes: 2