skyguy126
skyguy126

Reputation: 467

Multiprocessing Queue.get() hangs

I'm trying to implement basic multiprocessing and I've run into an issue. The python script is attached below.

import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue(10)
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        item = append_queue.get()
        database.append(item)
        print("Appended to database in %.4f seconds" % database.append_time)
        append_queue.task_done()
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    append_queue.join()
    #append_queue_process.join()
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

The AnalyzeFrequency analyzes the frequencies of words in a file and get_results() returns a sorted list of said words and frequencies. The list is very large, perhaps 10000 items.

This list is then passed to the add_to_append_queue method which adds it to a queue. The process_append_queue takes the items one by one and adds the frequencies to a "database". This operation takes a bit longer than the actual analysis in main() so I am trying to use a seperate process for this method. When I try and do this with the threading module, everything works perfectly fine, no errors. When I try and use Process, the script hangs at item = append_queue.get().

Could someone please explain what is happening here, and perhaps direct me toward a fix?

All answers appreciated!

UPDATE

The pickle error was my fault, it was just a typo. Now I am using the Queue class within multiprocessing but the append_queue.get() method still hangs. NEW CODE

import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue()
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        database.append(append_queue.get())
        print("Appended to database in %.4f seconds" % database.append_time)
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    #append_queue.join()
    #append_queue_process.join()
    print str(append_queue.qsize())
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

UPDATE 2

This is the database code:

class FrequencyStore:

    def __init__(self):
        self.sorter = Sorter()
        self.db = {}
        self.load_time = -1
        self.save_time = -1
        self.append_time = -1
        self.sort_time = -1

    def load_db(self):
        start_time = time.time()

        try:
            file = open("results.txt", 'r')
        except:
            raise IOError

        self.db = {}
        for line in file:
            word, count = line.strip("\n").split("=")
            self.db[word] = int(count)
        file.close()

        self.load_time = time.time() - start_time

    def save_db(self):
        start_time = time.time()

        _db = []
        for key in self.db:
            _db.append([key, self.db[key]])
        _db = self.sort(_db)

        try:
            file = open("results.txt", 'w')
        except:
            raise IOError

        file.truncate(0)
        for x in _db:
            file.write(x[0] + "=" + str(x[1]) + "\n")
        file.close()

        self.save_time = time.time() - start_time

    def create_sorted_db(self):
        _temp_db = []
        for key in self.db:
            _temp_db.append([key, self.db[key]])
        _temp_db = self.sort(_temp_db)
        _temp_db.reverse()
        return _temp_db

    def get_db(self):
        return self.db

    def sort(self, _list):
        start_time = time.time()

        _list = self.sorter.mergesort(_list)
        _list.reverse()

        self.sort_time = time.time() - start_time
        return _list

    def append(self, _list):
        start_time = time.time()

        for x in _list:
            if x[0] not in self.db:
                self.db[x[0]] = x[1]
            else:
                self.db[x[0]] += x[1]

        self.append_time = time.time() - start_time

Upvotes: 13

Views: 16907

Answers (2)

Tim Peters
Tim Peters

Reputation: 70602

Comments suggest you're trying to run this on Windows. As I said in a comment,

If you're running this on Windows, it can't work - Windows doesn't have fork(), so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue in main(), and pass it as an argument to the worker function.

Here's fleshing out what you need to do to make it portable, although I removed all the database stuff because it's irrelevant to the problems you've described so far. I also removed the daemon fiddling, because that's usually just a lazy way to avoid shutting down things cleanly, and often as not will come back to bite you later:

def process_append_queue(append_queue):
    while True:
        x = append_queue.get()
        if x is None:
            break
        print("processed %d" % x)
    print("worker done")

def main():
    import multiprocessing as mp

    append_queue = mp.Queue(10)
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
    append_queue_process.start()
    for i in range(100):
        append_queue.put(i)
    append_queue.put(None)  # tell worker we're done
    append_queue_process.join()

if __name__=="__main__":
    main()

The output is the "obvious" stuff:

processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done

Note: because Windows doesn't (can't) fork(), it's impossible for worker processes to inherit any Python object on Windows. Each process runs the entire program from its start. That's why your original program couldn't work: each process created its own Queue, wholly unrelated to the Queue in the other process. In the approach shown above, only the main process creates a Queue, and the main process passes it (as an argument) to the worker process.

Upvotes: 10

James Scholes
James Scholes

Reputation: 7906

queue.Queue is thread-safe, but doesn't work across processes. This is quite easy to fix, though. Instead of:

from multiprocessing import Process
from Queue import Queue

You want:

from multiprocessing import Process, Queue

Upvotes: 4

Related Questions