Reputation: 691
I am using pafy to stream in a set of youtube videos with the aim of combining them (splitscreen style) and displaying as one video. It's working but the frame rate is very slow when going above two videos because a frame is fetched from each stream, when I try 9 videos (for a 3x3 stitch) the fetching of the frames takes 0.1725 secs (too slow).
I figured the best way to reduce this was to fetch the streams in a parallel/multiprocess way.
I tried using Pipes and mulitprocessing but I am getting an EOFError: Ran out of input
See code below comment out/in the frames =
line to change between the working but slow method and my attempt at mulitprocessing
import multiprocessing
import cv2
import numpy as np
import pafy
import typing
import timeit
urls = [
"https://www.youtube.com/watch?v=tT0ob3cHPmE",
"https://www.youtube.com/watch?v=XmjKODQYYfg",
"https://www.youtube.com/watch?v=E2zrqzvtWio",
"https://www.youtube.com/watch?v=6cQLNXELdtw",
"https://www.youtube.com/watch?v=s_rmsH0wQ3g",
"https://www.youtube.com/watch?v=QfhpNe6pOqU",
"https://www.youtube.com/watch?v=C_9x0P0ebNc",
"https://www.youtube.com/watch?v=Ger6gU_9v9A",
"https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080
def main():
streams = [pafy.new(url).getbest() for url in urls]
videos = [cv2.VideoCapture() for streams in streams]
[video.open(best.url) for video, best in zip(videos, streams)]
cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while True:
start_time = timeit.default_timer()
# frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
frames = get_frames(videos)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
dst = merge_frames(frames)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
cv2.imshow('Video', dst)
if cv2.waitKey(1) & 0xFF == ord('e'):
break
print(timeit.default_timer() - start_time)
continue
[video.release() for video in videos]
cv2.destroyAllWindows()
def get_frames(videos):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
for video in videos:
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=get_frame, args=(video, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frame(video, send_end):
send_end.send(video.read()[1])
# send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))
def merge_frames(frames: typing.List[np.ndarray]):
width = np.math.ceil(np.sqrt(len(frames)))
rows = []
for row in range(width):
i1, i2 = width * row, width * row + width
rows.append(np.hstack(frames[i1: i2]))
return np.vstack(rows)
if __name__ == '__main__':
main()
Upvotes: 1
Views: 2487
Reputation: 955
Interesting application! Re the error, I run your code and the message is that it can't pickle VideoCapture object, see the links below, that's probably why the receiving pipe is empty. There are two errors from two threads: the first is the pickle, then comes the EOF.
EDIT #2: I managed to run it with one process per video etc.:
Regarding performance, I first did it without merging the images (I had to fix some details) to see whether it received, and for 3 and 4 frames, displayed in separated windows from the receiving threads it played very fast, faster than real time (tested with 3-4 streams). I think the merging and resizing for display are slow, the picture gets 2560x1440 for 4 streams (4x1280x720). In my case it's resized to fit the screen.
Thanks for sharing that question and that library etc!
(BTW, I tried with a lock as well, initially but it happened that it is not necessary. The code needs cleaning of some experiments. Also, the current implementation might not be synchronized per frame for each stream, because it doesn't join per frame as your original example, which created new processes to grab one frame from each and then merge them.)
The CPU load is mainly in the main process (a 4-core CPU, thus max=25% per instance):
Some times:
0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 LEN(FRAMES)= 9 0.06703700200000284 0.030708104000002123 6.409999997458726e-07 LEN(FRAMES)= 9
The waitKey in the main loop can be tweaked.
https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py
# Merging Youtube streams with pafy, opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code, properly close VideoCapture in the processes
import multiprocessing #Process, Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time
urls = [
"https://www.youtube.com/watch?v=tT0ob3cHPmE",
"https://www.youtube.com/watch?v=XmjKODQYYfg",
"https://www.youtube.com/watch?v=E2zrqzvtWio",
"https://www.youtube.com/watch?v=6cQLNXELdtw",
"https://www.youtube.com/watch?v=s_rmsH0wQ3g",
"https://www.youtube.com/watch?v=QfhpNe6pOqU",
"https://www.youtube.com/watch?v=C_9x0P0ebNc",
"https://www.youtube.com/watch?v=Ger6gU_9v9A",
"https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
# Merging seems to require equal number of sides, so 2x2, 3x3 etc. The resolutions should be the same.
'''
[
"https://www.youtube.com/watch?v=C_9x0P0ebNc",
"https://www.youtube.com/watch?v=Ger6gU_9v9A",
"https://www.youtube.com/watch?v=39dZ5WhDlLE",
"https://www.youtube.com/watch?v=QfhpNe6pOqU",
]
'''
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080
streams = []
#bestStreams = []
def main():
global bestStreams
streams = [pafy.new(url).getbest() for url in urls]
print(streams)
#[bestStreams for best in streams]
#print(bestStreams)
cv2.waitKey(0)
videos = [cv2.VideoCapture() for streams in streams]
bestURLS = []
#[video.open(best.url) for video, best in zip(videos, streams)] # Opened per process
[bestURLS.append(best.url) for best in streams]
#[ for video, best in zip(videos, streams)]
print(bestURLS)
cv2.waitKey(0)
cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
LOCK = Lock()
#proc = get_framesUL(bestStreams, LOCK)
#proc, pipes = get_framesULJ(bestStreams, LOCK)
proc, pipes = get_framesULJ(bestURLS, LOCK)
print("PROC, PIPES", proc, pipes)
#cv2.waitKey(0)
frames = []
numStreams = len(streams)
while True:
start_time = timeit.default_timer()
# frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
#frames = get_frames(videos, LOCK)
#frames = get_framesUL(streams, LOCK)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
frames = [x.recv() for x in pipes]
lf = len(frames)
print("LEN(FRAMES)=", lf);
#if lf<3: time.sleep(3); print("LEN(FRAMES)=", lf); #continue #Else merge and show
#proc.join()
#elif lf==3: frames = [x.recv() for x in pipes]
dst = merge_frames(frames)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
#if cv2!=None:
try:
cv2.imshow('Video', dst)
except: print("Skip")
#cv2.waitKey(1)
if cv2.waitKey(20) & 0xFF == ord('e'):
break
print(timeit.default_timer() - start_time)
continue
for proc in jobs:
proc.join()
# [video.release() for video in videos] # Per process
cv2.destroyAllWindows()
def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
# frames = [video.read()[-1] for video in videos]
print("get_framesULJ:",videosURL)
jobs = []
pipe_list = []
#print("VIDEOS:",videosURL)
#for video in videos:
for videoURL in videosURL: #urls:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
#p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#if (p==None): continue
print("P = ", p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
#cv2.waitKey(0)
#for proc in jobs:
# proc.join()
#frames = [x.recv() for x in pipe_list]
#return frames
#cv2.waitKey(0)
return jobs, pipe_list
def get_frame2L(videoURL, send_end, L):
v = cv2.VideoCapture()
#[video.open(best.url)
#L.acquire()
v.open(videoURL)
print("get_frame2", videoURL, v, send_end)
#cv2.waitKey(0)
while True:
ret, frame = v.read()
if ret: send_end.send(frame); #cv2.imshow("FRAME", frame); cv2.waitKey(1)
else: print("NOT READ!"); break
#send_end.send(v.read()[1])
#L.release()
def get_framesUL(videosURL, L):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
print("VIDEOS:",videosURL)
#for video in videos:
for videoURL in videosURL: #urls:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
#p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#if (p==None): continue
print("P = ", p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frames(videos, L):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
print("VIDEOS:",videos)
for video in videos:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
#if (p==None): continue
print("P = ", p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frame(video, send_end, L):
L.acquire()
print("get_frame", video, send_end)
send_end.send(video.read()[1])
L.release()
# send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))
def get_frame2(videoURL, send_end):
v = video.open(videoURL)
while True:
ret, frame = v.read()
if ret: send_end.send(frame)
else: break
def merge_frames(frames: typing.List[np.ndarray]):
#cv2.imshow("FRAME0", frames[0]) ########## not images/small
#cv2.imshow("FRAME1", frames[1]) ##########
#cv2.imshow("FRAME2", frames[2]) ##########
#cv2.imshow("FRAME3", frames[3]) ##########
#cv2.waitKey(1)
width = np.math.ceil(np.sqrt(len(frames)))
rows = []
for row in range(width):
i1, i2 = width * row, width * row + width
rows.append(np.hstack(frames[i1: i2]))
return np.vstack(rows)
if __name__ == '__main__':
main()
EDIT #1 IDEA: Creating one process per video stream and reading it in a loop (pumping in a pipe), instead of a new process for each frame, and/thus opening the videos/VideoCapture objects with a videoURL through the pipe, instead of sending a VideoCapture object. (I don't know if it has the same pickle issue in this form)
...
in main:
bestURLS = []
proc, pipes = get_framesULJ(bestURLS, LOCK)
[bestURLS.append(best.url) for best in streams]
def get_frame2(videoURL, send_end):
v = video.open(videoURL)
while True:
ret, frame = v.read()
if ret: send_end.send(video)
else: break
def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
print("get_framesULJ:",videosURL)
jobs = []
pipe_list = []
for videoURL in videosURL:
recv_end, send_end = multiprocessing.Pipe(False)
print(recv_end, send_end)
p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
print("P = ", p)
jobs.append(p)
print("JOBS, len", jobs, len(jobs))
pipe_list.append(recv_end)
print("pipe_list", pipe_list)
p.start()
return jobs, pipe_list
Original answer:
<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
File "y.py", line 104, in <module>
main()
File "y.py", line 48, in main
frames = get_frames(videos)
File "y.py", line 80, in get_frames
p.start()
File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in
start
self._popen = self._Popen(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in
_Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 326, in
_Popen
return Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", lin
e 93, in __init__
reduction.dump(process_obj, to_child)
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in
dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object
Z:\>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in sp
awn_main
exitcode = _main(fd, parent_sentinel)
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _m
ain
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
It fails before p.start. The instances are created and the structures seems OK:
VIDEOS: [<VideoCapture 000000000D418710>, <VideoCapture 000000000D4186F0>, <Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P = <Process name='Process-1' parent=8892 initial>
JOBS, len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>
See the module pickle:
https://docs.python.org/3/library/pickle.html
It seems not everything can be "pickled".
What can be pickled and unpickled?
The following types can be pickled:
None, True, and False integers, floating point numbers, complex numbers strings, bytes, bytearrays tuples, lists, sets, and dictionaries containing only picklable objects functions defined at the top level of a module (using def, not lambda) built-in functions defined at the top level of a module classes that are defined at the top level of a module instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).
Besides it seems there was a bug in opencv causing that. One of the solution given is to turn the multiprocessing off...
Python multiprocess can't pickle opencv videocapture object
https://github.com/MVIG-SJTU/AlphaPose/issues/164
Fang-Haoshu commented on 17 Oct 2018
This bug is due to multi-processing in opencv. --sp disable multi-processing. BTW, can you tell me the version of opencv that you are using?
I guess something about locked memory or something.
A workaround I would try would be to first dump the pixels of the object as plain data or raw something, maybe with a header about the size etc.
Also, in general, for smoother play I think some buffering needs to be added.
BTW, what version is your openCV? Mine is 4.2.0
Upvotes: 5