Reputation: 4229
I am reading a video file such that out for every 20 frames I'm storing first frames in Input Queue. Once I get all the required frames in Input Queue, then I run multiple processes to perform some operation on these frames and store the results in output queue. But the code always stuck at join, I tried different solutions proposed for such problems but none of them seems to work.
import numpy as np
import cv2
import timeit
import face_recognition
from multiprocessing import Process, Queue, Pool
import multiprocessing
import os
s = timeit.default_timer()
def alternative_process_target_func(input_queue, output_queue):
while not output_queue.full():
frame_no, small_frame, face_loc = input_queue.get()
print('Frame_no: ', frame_no, 'Process ID: ', os.getpid(), '----', multiprocessing.current_process())
#canny_frame(frame_no, small_frame, face_loc)
#I am just storing frame no for now but will perform something else later
output_queue.put((frame_no, frame_no))
if output_queue.full():
print('Its Full ---------------------------------------------------------------------------------------')
else:
print('Not Full')
print(timeit.default_timer() - s, ' seconds.')
print('I m not reading anymore. . .', os.getpid())
def alternative_process(file_name):
start = timeit.default_timer()
cap = cv2.VideoCapture(file_name)
frame_no = 1
fps = cap.get(cv2.CAP_PROP_FPS)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print('Frames Per Second: ', fps)
print('Total Number of frames: ', length)
print('Duration of file: ', int(length / fps))
processed_frames = 1
not_processed = 1
frames = []
process_this_frame = True
frame_no = 1
Input_Queue = Queue()
while (cap.isOpened()):
ret, frame = cap.read()
if not ret:
print('Size of input Queue: ', Input_Queue.qsize())
print('Total no of frames read: ', frame_no)
end1 = timeit.default_timer()
print('Time taken to fetch useful frames: ', end1 - start)
threadn = cv2.getNumberOfCPUs()
Output_Queue = Queue(maxsize=Input_Queue.qsize())
process_list = []
#quit = multiprocessing.Event()
#foundit = multiprocessing.Event()
for x in range((threadn - 1)):
# print('Process No : ', x)
p = Process(target=alternative_process_target_func, args=(Input_Queue, Output_Queue))#, quit, foundit
#p.daemon = True
p.start()
process_list.append(p)
#p.join()
# for proc in process_list:
# print('---------------------------------------------------------------', proc.p)
i = 1
for proc in process_list:
print('I am hanged here')
proc.join()
print('I am done')
i += 1
end = timeit.default_timer()
print('Time taken by face verification: ', end - start)
break
if process_this_frame:
print(frame_no)
small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
rgb_small_frame = small_frame[:, :, ::-1]
face_locations = face_recognition.face_locations(rgb_small_frame)
# frames.append((rgb_small_frame, face_locations))
Input_Queue.put((frame_no, rgb_small_frame, face_locations))
frame_no += 1
if processed_frames < 5:
processed_frames += 1
not_processed = 1
else:
if not_processed < 15:
process_this_frame = False
not_processed += 1
else:
processed_frames = 1
process_this_frame = True
print('-----------------------------------------------------------------------------------------------')
cap.release()
cv2.destroyAllWindows()
alternative_process('user_verification_2.avi')
Upvotes: 5
Views: 8209
Reputation: 19352
As the documentation on Process.join()
says, hanging (or "blocking") is exactly what is expected to happen:
Block the calling thread until the process whose
join()
method is called terminates or until the optional timeout occurs.
join()
stops current thread until the target process finishes. Target process is calling alternative_process_target_func
, so the problem is obviously in that function. It never finishes. There may be more than one reason for that.
alternative_process_target_func
runs until output_queue.full()
. What if it is never full? It never ends? It is really better to determine the end some other way, e.g. run until the input queue is empty.
input_queue.get()
will block if the input queue is empty. As the documentation says:
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available.
You are running multiple processes, so do not expect that there is something in input just because output_queue.full()
was False a moment ago, and because input size is the same as output size. A lot could have happened in the meantime.
What you want to do is:
try:
input_queue.get(False) # or input_queue.get_nowait()
except Empty:
break # stop when there is nothing more to read from the input
output_queue.put((frame_no, frame_no))
will block if there is no room in the output to store the data.
Again, you are assuming that there is room in output, just because you checked output_queue.full()
a few moments ago, and because input size is equal to output size. Never rely on such things.
You want to do the same thing as for input:
try:
output_queue.put((frame_no, frame_no), False)
# or output_queue.put_nowait((frame_no, frame_no))
except Empty:
# deal with this somehow, e.g.
raise Exception("There is no room in the output queue to write to.")
Upvotes: 4