phdscm
phdscm

Reputation: 233

multiprocessing queue storing list of lists

I'm trying to enqueue a list of lists in a process (in the function named proc below), and then have the process terminate itself after I call event.set(). My function proc always finishes, judging by the printout, but the process itself is still going. I can get this to work if I make the number of lists enqueued in a call to put lower (batchperq variable) (or the size of each nested list smaller).

import multiprocessing as mp
import queue
import numpy as np
import time

def main():
    trainbatch_q = mp.Queue(10)

    batchperq = 50  
    event = mp.Event()

    tl1 = mp.Process(target=proc,
                            args=( trainbatch_q, 20, batchperq, event))
    tl1.start()
    time.sleep(3)
    event.set()
    tl1.join()
    print("Never printed..")    

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000 
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")

When I do a keyboard interrupt, I get the trace below... what could the "lock" it's trying to acquire be? Something to do with the queue?

Traceback (most recent call last):
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function
    _run_finalizers()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Upvotes: 4

Views: 1906

Answers (1)

K-Log
K-Log

Reputation: 618

The reason your process is never exiting is because you're never telling it to exit. I added a return to the end of your function and your process appears to exit correctly now.

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")
    return            # Added this line, You can have it return whatever is most relevant to you.

Here is the full program I ran including my changes to make it exit successfully.

import multiprocessing as mp
import queue
import numpy as np
import random
import time

def main():
    trainbatch_q = mp.Queue(10)
    batchperq = 50  
    event = mp.Event()

    tl1 = mp.Process(target=proc,
                            args=( trainbatch_q, 20, batchperq, event))
    print("Starting")
    tl1.start()
    time.sleep(3)
    event.set()
    tl1.join()
    print("Never printed..")    

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")
    return      # Added this line, You can have it return whatever is most relevant to you.

if __name__ == "__main__":
  main()

Hope this is helps.

Upvotes: 1

Related Questions