Reputation: 507
EDIT: I've had questions about what the video stream is, so I will offer more clarity. The stream is a live video feed from my webcam, accessed via OpenCV. I get each frame as the camera reads it, and send it to a separate process for processing. The process returns text based on computations done on the image. The text is then displayed onto the image. I need to display the stream in realtime, and it is ok if there is a lag between the text and the video being shown (i.e. if the text was applicable to a previous frame, that's ok).
Perhaps an easier way to think of this is that I'm doing image recognition on what the webcam sees. I send one frame at a time to a separate process to do recognition analysis on the frame, and send the text back to be put as a caption on the live feed. Obviously the processing takes more time than simply grabbing frames from the webcam and showing them, so if there is a delay in what the caption is and what the webcam feed shows, that's acceptable and expected.
What's happening now is that the live video I'm displaying is lagging due to the other processes (when I don't send frames to the process for computing, there is no lag). I've also ensured only one frame is enqueued at a time so avoid overloading the queue and causing lag. I've updated the code below to reflect this detail.
I'm using the multiprocessing module in python to help speed up my main program. However I believe I might be doing something incorrectly, as I don't think the computations are happening quite in parallel.
I want my program to read in images from a video stream in the main process, and pass on the frames to two child processes that do computations on them and send text back (containing the results of the computations) to the main process.
However, the main process seems to lag when I use multiprocessing, running about half as fast as without it, leading me to believe that the processes aren't running completely in parallel.
After doing some research, I surmised that the lag may have been due to communicating between the processes using a queue (passing an image from the main to the child, and passing back text from child to main).
However I commented out the computational step and just had the main process pass an image and the child return blank text, and in this case, the main process did not slow down at all. It ran at full speed.
Thus I believe that either
1) I am not optimally using multiprocessing
OR
2) These processes cannot truly be run in parallel (I would understand a little lag, but it's slowing the main process down in half).
Here's a outline of my code. There is only one consumer instead of 2, but both consumers are nearly identical. If anyone could offer guidance, I would appreciate it.
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
while True:
image = self.task_queue.get()
#Do computations on image
self.result_queue.put("text")
return
import cv2
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
if tasks.empty():
tasks.put(image)
else:
text = tasks.get()
#Add text to frame
cv2.putText(frame,text)
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
Upvotes: 15
Views: 5327
Reputation: 4277
Here's a more elegant (IMHO) solution that utilizes multiple processes for processing your frames:
def process_image(args):
image, frame = args
#Do computations on image
return "text", frame
import cv2
pool = multiprocessing.Pool()
def image_source():
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
yield image, frame
# Getting next frame from camera
rval, frame = vc.read()
for (text, frame) in pool.imap(process_image, image_source()):
# Add text to frame
cv2.putText(frame, text)
# Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
Pool.imap
should allow you to iterate through the pool's results while it's still processing other images from your cam.
Upvotes: 3
Reputation: 2562
After reading your comment to my previous answer, I understood your problem a 'bit' better. I would like to have more information about your code/problem. Anyway, and because this code is significantly different than my previous answer, I decided to provide another answer. I won't comment the code too much though, because you can follow it from my previous answer. I will use text instead of images, just to simulate the process.
The following code prints a letter out of "lorem ipsum", selecting one out of 6 letters (frames). Because there is a lag, we need a buffer that I implemented with a deque. After the buffer has advanced, the displaying of the frame and caption are in sync.
I don't know how often you tag a frame, or how much it really takes to process it, but you can have an educated guess with this code by playing with some of the variables.
import time
import random
random.seed(1250)
from multiprocessing import Pool, Manager
from collections import deque
def display_stream(stream, pool, queue, buff, buffered=False):
delay = 24
popped_frames = 0
for i, frame in enumerate(stream):
buff.append([chr(frame), ''])
time.sleep(1/24 * random.random()) # suppose a 24 fps video
if i % 6 == 0: # suppose one out of 6 frames
pool.apply_async(process_frame, (i, frame, queue))
ii, caption = (None, '') if queue.empty() else queue.get()
if buffered:
if ii is not None:
buff[ii - popped_frames][1] = caption
if i > delay:
print(buff.popleft())
popped_frames += 1
else:
lag = '' if ii is None else i - ii
print(chr(frame), caption, lag)
else:
pool.close()
pool.join()
if buffered:
try:
while True:
print(buff.popleft())
except IndexError:
pass
def process_frame(i, frame, queue):
time.sleep(0.4 * random.random()) # suppose ~0.2s to process
caption = chr(frame).upper() # mocking the result...
queue.put((i, caption))
if __name__ == '__main__':
p = Pool()
q = Manager().Queue()
d = deque()
stream = b'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.'
display_stream(stream, p, q, b)
Upvotes: 1
Reputation: 5270
I want my program to read in images from a video stream in the main process
In producer/consumer implementations, which is what you have above, the producer, what puts tasks into the queue to be executed by the consumers, needs to be separate from the main/controlling process so that it can add tasks in parallel with the main process reading output from results queue.
Try the following. Have added a sleep in the consumer processes to simulate processing and added a second consumer to show they are being run in parallel.
It would also be a good idea to limit the size of the task queue to avoid having it run away with memory usage if processing cannot keep up with input stream. Can specify a size when calling Queue(<size>)
. If the queue is at that size, calls to .put
will block until the queue is not full.
import time
import multiprocessing
import cv2
class ImageProcessor(multiprocessing.Process):
def __init__(self, tasks_q, results_q):
multiprocessing.Process.__init__(self)
self.tasks_q = tasks_q
self.results_q = results_q
def run(self):
while True:
image = self.tasks_q.get()
# Do computations on image
time.sleep(1)
# Display the result on stream
self.results_q.put("text")
# Tasks queue with size 1 - only want one image queued
# for processing.
# Queue size should therefore match number of processes
tasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()
processor = ImageProcessor(tasks_q, results_q)
processor.start()
def capture_display_video(vc):
rval, frame = vc.read()
while rval:
image = frame.get_image()
if not tasks_q.full():
tasks_q.put(image)
if not results_q.empty():
text = results_q.get()
cv2.putText(frame, text)
cv2.imshow("preview", frame)
rval, frame = vc.read()
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
if not vc.isOpened():
raise Exception("Cannot capture video")
capture_display_video(vc)
processor.terminate()
Upvotes: 9
Reputation: 1359
(Updated solution based on you last code sample)
It will get images from the stream, put one in the task queue as soon as it is available, and display the last image with the last text.
I put some active loop in there to simulate a processing longer than the time between two images. I means that the text displayed is not necessarily the one belonging to the image, but the last one computed. If the processing is fast enough, the shift between image and text should be limited.
Note that I force calls to get/put with some try/catch. Per the doc, empty and full are not 100% accurate.
import cv2
import multiprocessing
import random
from time import sleep
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
# Other initialization stuff
def run(self):
while True:
frameNum, frameData = self.task_queue.get()
# Do computations on image
# Simulate a processing longer than image fetching
m = random.randint(0, 1000000)
while m >= 0:
m -= 1
# Put result in queue
self.result_queue.put("result from image " + str(frameNum))
return
# No more than one pending task
tasks = multiprocessing.Queue(1)
results = multiprocessing.Queue()
# Init and start consumer
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
else:
rval = False
# Dummy int to represent frame number for display
frameNum = 0
# String for result
text = None
font = cv2.FONT_HERSHEY_SIMPLEX
# Process loop
while rval:
# Grab image from stream
frameNum += 1
# Put image in task queue if empty
try:
tasks.put_nowait((frameNum, frame))
except:
pass
# Get result if ready
try:
# Use this if processing is fast enough
# text = results.get(timeout=0.4)
# Use this to prefer smooth display over frame/text shift
text = results.get_nowait()
except:
pass
# Add last available text to last image and display
print("display:", frameNum, "|", text)
# Showing the frame with all the applied modifications
cv2.putText(frame,text,(10,25), font, 1,(255,0,0),2)
cv2.imshow("preview", frame)
# Getting next frame from camera
rval, frame = vc.read()
# Optional image resize
# frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
Here is some output, you can see the delay between image and result, and the result catching back.
> ('display:', 493, '|', 'result from image 483')
> ('display:', 494, '|', 'result from image 483')
> ('display:', 495, '|', 'result from image 489')
> ('display:', 496, '|', 'result from image 490')
> ('display:', 497, '|', 'result from image 495')
> ('display:', 498, '|', 'result from image 496')
Upvotes: 2
Reputation: 2562
Let me suggest you a slightly different approach, with a lot less "red tape". I think the main problem is that you are overlooking the main way to communicate with a process: it is through its arguments and return values. If you can send the frame data as an argument, there is no need of queues or pipes or other methods.
import time
from multiprocessing import Pool
def process_frame(frame_id, frame_data):
# this function simulates the processing of the frame.
# I used a longer sleep thinking that it takes longer
# and therefore the reason of parallel processing.
print("..... got frame {}".format(frame_id))
time.sleep(.5)
char = frame_data[frame_id]
count = frame_data.count(char)
return frame_id, char, count
def process_result(res):
# this function simulates the function that would receive
# the result from analyzing the frame, and do what is
# appropriate, like printing, making a dict, saving to file, etc.
# this function is called back when the result is ready.
frame_id, char, count = res
print("in frame {}".format(frame_id), \
", character '{}' appears {} times.".format(
chr(char), count))
if __name__ == '__main__':
pool = Pool(4)
# in my laptop I got these times:
# workers, time
# 1 10.14
# 2 5.22
# 4 2.91
# 8 2.61 # no further improvement after 4 workers.
# your case may be different though.
from datetime import datetime as dt
t0 = dt.now()
for i in range(20): # I limited this loop to simulate 20 frames
# but it could be a continuous stream,
# that when finishes should execute the
# close() and join() methods to finish
# gathering all the results.
# The following lines simulate the video streaming and
# your selecting the frames that you need to analyze and
# send to the function process_frame.
time.sleep(0.1)
frame_id = i
frame_data = b'a bunch of binary data representing your frame'
pool.apply_async( process_frame, #func
(frame_id, frame_data), #args
callback=process_result #return value
)
pool.close()
pool.join()
print(dt.now() - t0)
I think that this simpler approach would be enough for your program. No need of using classes or queues.
Upvotes: 1
Reputation: 3154
You could try setting the affinity mask to make sure each process runs on a different core. I use this on windows 7.
def setaffinity(mask = 128): # 128 is core 7
pid = win32api.GetCurrentProcessId()
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
win32process.SetProcessAffinityMask(handle, mask)
return
Upvotes: -2