Shaokan
Shaokan

Reputation: 7684

python multithreading question

Can anyone tell me why this code generates queue after starting the threads? Basically, queue is generated after the for loop but in ThreadUrl class it already uses queue.get() method. How does this work? How can it get the values from a queue that is not yet generated?

for i in range(5):
    t = ThreadUrl(queue, out_queue)
    t.setDaemon(True)
    t.start()

# This is what confuses me! Shouldn't it be above the for loop??
for host in hosts:
    queue.put(host)

for i in range(5):
    dt = DatamineThread(out_queue)
    dt.setDaemon(True)
    dt.start()

#wait on the queue until everything has been processed
queue.join()
out_queue.join()

Here is the full source

import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)

Upvotes: 0

Views: 282

Answers (1)

Roman Bodnarchuk
Roman Bodnarchuk

Reputation: 29707

Line host = self.queue.get() blocks executing thread until some element appear in the queue.

So

#spawn a pool of threads, and pass them queue instance
for i in range(5):
    t = ThreadUrl(queue, out_queue)
    t.setDaemon(True)
    t.start()

creates 5 threads that are waiting for any element in the queue.

#populate queue with data
for host in hosts:
    queue.put(host)

fills the queue. After this threads start their processing.

Upvotes: 6

Related Questions