Mike Me
Mike Me

Reputation: 2392

Iterate Dictionary using Multithreading in Python

I want to iterate through dictionaries in a dictionary (simulating the structure of a directory or a website) using multithreading and Queue(to limit the number of threads) in Python. I created mainDict to simulate this

mainDict = {"Layer1": {"Layer11": 1, "Layer12": 1, "Layer13": 1, "Layer14": 1, "Layer15": 1," Layer16": 1},
        "Layer2": {"Layer21": 2, "Layer22": 2, "Layer23": 2, "Layer24": 2, "Layer25": 2, "Layer26": 2},
        "Layer3": {"Layer31": 4, "Layer32": 4, "Layer33": 4, "Layer34": 4, "Layer35": 4, "Layer36": 4},
        "Layer4": {"Layer41": 8, "Layer42": 8, "Layer43": 8, "Layer44": 8, "Layer45": 8, "Layer46": 8},
        "Layer5": {"Layer51": 16, "Layer52": 16, "Layer53": 16, "Layer54": 16, "Layer55": 16, "Layer56": 16},
        "Layer6": {"Layer61": 32, "Layer62": 32, "Layer63": 32, "Layer64": 32, "Layer65": 32, "Layer66": 32}}

and a Crawler class to instantiate a crawler for each first subdictionary of mainDict.

The idea is that I want to create 2 threads (a limited number of threads/crawlers at a time to reduce CPU usage) that can crawl to Layer(i) (i=1..6). Each thread will crawl until it reaches the leaves of the "tree"than moves to the next dictionary (e.g. crawler 0 will go through Layer1 and crawler 1 will go through Layer2, after finishing go through Layer3...).

class Crawler:
def __init__(self, rootDict, number_of_delay, crawler):
    self.crawler = crawler
    self.rootDict = rootDict
    self.number_of_delay = number_of_delay

def crawlAllLeaves(self, myDict):
    for k, v in myDict.items():
        if isinstance(v, dict):
            print("Crawler {} is crawling {}".format(self.crawler, k))
            self.crawlAllLeaves(v)
        else:
            print("Crawler {} reached the value {} for key {}".format(self.crawler, v, k))
            time.sleep(self.number_of_delay + v)

def someAuxFunc():
    #to simulate some loading time
    time.sleep(2)

def createWorker(q, delayNumber, crawler):
    tc = Crawler(mainDict[q.get()], delayNumber, crawler)
    tc.crawlAllLeaves(tc.rootDict)

def threader(q, delayNumber, crawler):
    while True:
        print("crawler {}: has gotten the url {}".format(crawler, q.get())) 
        createWorker(q, delayNumber, crawler)
        print("crawler {}: has finished the url {}".format(crawler, q.get())) 
        q.task_done()

q = Queue()
number_of_threads = 2
delayNumber = 2

for thread in range(number_of_threads):
    th = threading.Thread(target=threader, args=(q, delayNumber, thread,))
    th.setDaemon(True)
    th.start()


for key, value in mainDict.items():
    someAuxFunc()
    print("QUEING {}".format(key))
    q.put(key)

q.join()

I have 2 problems:

  1. It creates only 2 threads and gets the first 2 elements(subdictionaries) of the queue then it doesn't do anything not even quit; it stays hanged
  2. in threader() function it says it will get a subdictionary but iterates a different one, as seen by print in crawlAllLeaves()

Can you help me with this one as I'd like to learn Python and threading and I don't know what I am doing wrong?

Upvotes: 1

Views: 1190

Answers (1)

Hannu
Hannu

Reputation: 12205

Your problem is in handling of the queue, and it explains both of your issues. You keep reading from the queue instead of using the value you actually received from there. Take a look at this (fixed) code:

def createWorker(bar, delayNumber, crawler):
    tc = Crawler(mainDict[bar], delayNumber, crawler)
    tc.crawlAllLeaves(tc.rootDict)

def threader(q, delayNumber, crawler):
    while True:
        foo = q.get()
        print("crawler {}: has gotten the url {}".format(crawler, foo)) 
        createWorker(foo, delayNumber, crawler)
        print("crawler {}: has finished the url {}".format(crawler, foo)) 
        q.task_done()

In your threader we read now the queue once to a variable and then pass this variable to createWorker. In your createWorker you use this value instead of getting another.

Your original code initially gets a value from the queue in your first print statement. It prints the value and then discards. Then you call createWorker, where you receive the next value from the queue and start work on that. Finally the second print statement gets yet another value from the queue and prints that. None of the values shown in print statements are actually passed to createWorker.

Queue.get() blocks by default if there is nothing there. As you get three values for every one processed, your result is whatever it is but definitely not what you intend it to be. Your code blocks in the final q.join() as you have used get() three times to get a value from the queue but used task_done only once. Thus your join blocks as it assumes there are still tasks in progress.

Upvotes: 2

Related Questions