utengr
utengr

Reputation: 3355

Multi-threading in Python: Getting stuck at last thread

I have a strange situation and cannot figure it out after lots of hit-trials. I am using multi-threading (10) for reading urls (100) and it works fine in most cases but in some situation, it gets stuck at the last thread. I waited for it to see if it returns and it took a lot of time (1050 seconds) whereas the rest of the nine threads returned within 25 seconds. It shows something is wrong with my code but can't figure it out. Any ideas?

Note1: It happens for both daemon and non-daemon threads.

Note2: The number of URLs and thread changes. I tried a different number of URLs from 10-100 and various threads from 5-50.

Note3: The URLs are most of the time completely different.

import urllib2
import Queue
import threading
from goose import Goose

input_queue = Queue.Queue()
result_queue = Queue.Queue()

Thread Worker:

def worker(input_queue, result_queue):
    queue_full = true
    while queue_full:
        try:
            url = input_queue.get(False)
            read a url using urllib2 and goose
            process it
            result_queue.put(updated value)
        except Queue.Empty:
           queue_full = False

Main process:

for url in urls:
    input_queue.put(url)
thread_count = 5 
for t in range(thread_count):
        t = threading.Thread(target=worker, args= (input_queue, result_queue))
        t.start()

for url in urls:
    url = result_queue.get() # updates url   

The process gets blocked at the last result_queue.get() call.

NOTE: I am more interested in what I am doing wrong here, in case someone can point that out? Because I tend to think that I wrote correct code but apparently that's not the case.

Upvotes: 7

Views: 6463

Answers (2)

stamaimer
stamaimer

Reputation: 6495

You can use ThreadPoolExecutor from concurrent.futures.

from concurrent.futures import ThreadPoolExecutor

MAX_WORKERS = 50

def worker(url):

    response = requests.get(url)

    return response.content

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:

    results = executor.map(worker, urls)

for result in results:

    print(result)

Upvotes: 2

Kallz
Kallz

Reputation: 3523

For example, i take URL as a list of numbers

import urllib2
import Queue
import threading
#from goose import Goose

input_queue = Queue.Queue()
result_queue = Queue.Queue()


def worker(input_queue, result_queue):

    while not input_queue.empty():

        try:
            url = input_queue.get(False)
            updated_value = int(url) * 9
            result_queue.put(updated_value)
        except Queue.Empty:
            pass



urls = [1,2,3,4,5,6,7,8,9]

for url in urls:
    input_queue.put(url)

thread_count = 5 

for i in range(thread_count):
    t = threading.Thread(target=worker, args= (input_queue, result_queue))
    t.start()
    t.join()

for url in urls:
    try:
        url = result_queue.get() 
        print url
    except Queue.Empty:
        pass

Output

9
18 
27
36
45
54
63
72
81

Upvotes: 2

Related Questions