Fraser Langton
Fraser Langton

Reputation: 691

Use multiprocessing to read multiple video streams?

I am using pafy to stream in a set of youtube videos with the aim of combining them (splitscreen style) and displaying as one video. It's working but the frame rate is very slow when going above two videos because a frame is fetched from each stream, when I try 9 videos (for a 3x3 stitch) the fetching of the frames takes 0.1725 secs (too slow).

I figured the best way to reduce this was to fetch the streams in a parallel/multiprocess way.

I tried using Pipes and mulitprocessing but I am getting an EOFError: Ran out of input

See code below comment out/in the frames = line to change between the working but slow method and my attempt at mulitprocessing

import multiprocessing
import cv2
import numpy as np
import pafy
import typing
import timeit

urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE",
    "https://www.youtube.com/watch?v=XmjKODQYYfg",
    "https://www.youtube.com/watch?v=E2zrqzvtWio",

    "https://www.youtube.com/watch?v=6cQLNXELdtw",
    "https://www.youtube.com/watch?v=s_rmsH0wQ3g",
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",

    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A",
    "https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080


def main():
    streams = [pafy.new(url).getbest() for url in urls]

    videos = [cv2.VideoCapture() for streams in streams]

    [video.open(best.url) for video, best in zip(videos, streams)]

    cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)

    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
        frames = get_frames(videos)
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        cv2.imshow('Video', dst)

        if cv2.waitKey(1) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)

        continue

    [video.release() for video in videos]
    cv2.destroyAllWindows()


def get_frames(videos):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    for video in videos:
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=get_frame, args=(video, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames


def get_frame(video, send_end):
    send_end.send(video.read()[1])
    # send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))


def merge_frames(frames: typing.List[np.ndarray]):
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1, i2 = width * row, width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    return np.vstack(rows)


if __name__ == '__main__':
    main()

Upvotes: 1

Views: 2487

Answers (1)

Twenkid
Twenkid

Reputation: 955

Interesting application! Re the error, I run your code and the message is that it can't pickle VideoCapture object, see the links below, that's probably why the receiving pipe is empty. There are two errors from two threads: the first is the pickle, then comes the EOF.

EDIT #2: I managed to run it with one process per video etc.:

enter image description here

enter image description here

Regarding performance, I first did it without merging the images (I had to fix some details) to see whether it received, and for 3 and 4 frames, displayed in separated windows from the receiving threads it played very fast, faster than real time (tested with 3-4 streams). I think the merging and resizing for display are slow, the picture gets 2560x1440 for 4 streams (4x1280x720). In my case it's resized to fit the screen.

Thanks for sharing that question and that library etc!

(BTW, I tried with a lock as well, initially but it happened that it is not necessary. The code needs cleaning of some experiments. Also, the current implementation might not be synchronized per frame for each stream, because it doesn't join per frame as your original example, which created new processes to grab one frame from each and then merge them.)

The CPU load is mainly in the main process (a 4-core CPU, thus max=25% per instance):

enter image description here

Some times:

0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 LEN(FRAMES)= 9 0.06703700200000284 0.030708104000002123 6.409999997458726e-07 LEN(FRAMES)= 9

The waitKey in the main loop can be tweaked.

Code

https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py

# Merging Youtube streams with pafy, opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code, properly close VideoCapture in the processes

import multiprocessing #Process, Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time

urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE",
    "https://www.youtube.com/watch?v=XmjKODQYYfg",
    "https://www.youtube.com/watch?v=E2zrqzvtWio",

    "https://www.youtube.com/watch?v=6cQLNXELdtw",
    "https://www.youtube.com/watch?v=s_rmsH0wQ3g",
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",

    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A", 
    "https://www.youtube.com/watch?v=39dZ5WhDlLE"
]

# Merging seems to require equal number of sides, so 2x2, 3x3 etc. The  resolutions should be the same.
'''
[    
    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A",
    "https://www.youtube.com/watch?v=39dZ5WhDlLE",   
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",
]
'''

width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080

streams = []
#bestStreams = []

def main():
    global bestStreams
    streams = [pafy.new(url).getbest() for url in urls]
    print(streams)
    #[bestStreams for best in streams]
    #print(bestStreams)
    cv2.waitKey(0)
    videos = [cv2.VideoCapture() for streams in streams]
    bestURLS = [] 
    #[video.open(best.url) for video, best in zip(videos, streams)]  # Opened per process
    [bestURLS.append(best.url) for best in streams]
    
    #[ for video, best in zip(videos, streams)]
    print(bestURLS)
    cv2.waitKey(0)
    cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    LOCK = Lock()
    #proc = get_framesUL(bestStreams, LOCK)
    #proc, pipes = get_framesULJ(bestStreams, LOCK)
    proc, pipes = get_framesULJ(bestURLS, LOCK)     
    print("PROC, PIPES", proc, pipes)
    #cv2.waitKey(0)
    frames = []
    numStreams = len(streams)
    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
        #frames = get_frames(videos, LOCK)
        #frames = get_framesUL(streams, LOCK)
        
        
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        
        frames = [x.recv() for x in pipes]
        lf = len(frames)
        print("LEN(FRAMES)=", lf);
        #if lf<3: time.sleep(3); print("LEN(FRAMES)=", lf); #continue #Else merge and show
        #proc.join()
        #elif lf==3: frames = [x.recv() for x in pipes]
                
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)
         
        start_time = timeit.default_timer()      
        #if cv2!=None:
        try:
          cv2.imshow('Video', dst)
        except: print("Skip")
        #cv2.waitKey(1)  

        if cv2.waitKey(20) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)

        continue
        
    for proc in jobs:
        proc.join()
        
    # [video.release() for video in videos] # Per process
    cv2.destroyAllWindows()



def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
    # frames = [video.read()[-1] for video in videos]
    print("get_framesULJ:",videosURL)    
    jobs = []
    pipe_list = []
    #print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()
        #cv2.waitKey(0)

    #for proc in jobs:
    #    proc.join()

    #frames = [x.recv() for x in pipe_list]
    #return frames
    #cv2.waitKey(0)
    return jobs, pipe_list

def get_frame2L(videoURL, send_end, L):
    v = cv2.VideoCapture()
    #[video.open(best.url)
    #L.acquire()
    v.open(videoURL)
    print("get_frame2", videoURL, v, send_end)
    #cv2.waitKey(0)
    while True:      
      ret, frame = v.read()
      if ret: send_end.send(frame); #cv2.imshow("FRAME", frame); cv2.waitKey(1)   
      else: print("NOT READ!"); break
    #send_end.send(v.read()[1])
    #L.release()
    
def get_framesUL(videosURL, L):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames


def get_frames(videos, L):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    print("VIDEOS:",videos)    
    for video in videos:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames
    
def get_frame(video, send_end, L):
    L.acquire()
    print("get_frame", video, send_end)
    send_end.send(video.read()[1])
    L.release()
    # send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))

    
def get_frame2(videoURL, send_end):
    v = video.open(videoURL)       
    while True:
      ret, frame = v.read()
      if ret: send_end.send(frame)
      else: break
      
    
def merge_frames(frames: typing.List[np.ndarray]):
    #cv2.imshow("FRAME0", frames[0]) ########## not images/small
    #cv2.imshow("FRAME1", frames[1]) ##########
    #cv2.imshow("FRAME2", frames[2]) ##########
    #cv2.imshow("FRAME3", frames[3]) ##########
    #cv2.waitKey(1)
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1, i2 = width * row, width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    
    
    return np.vstack(rows)


if __name__ == '__main__':
    main()

EDIT #1 IDEA: Creating one process per video stream and reading it in a loop (pumping in a pipe), instead of a new process for each frame, and/thus opening the videos/VideoCapture objects with a videoURL through the pipe, instead of sending a VideoCapture object. (I don't know if it has the same pickle issue in this form)

...
in main:
bestURLS = []
proc, pipes = get_framesULJ(bestURLS, LOCK) 
[bestURLS.append(best.url) for best in streams]



def get_frame2(videoURL, send_end):
    v = video.open(videoURL)       
    while True:
      ret, frame = v.read()
      if ret: send_end.send(video)
      else: break

 def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
print("get_framesULJ:",videosURL)    
jobs = []
pipe_list = []
for videoURL in videosURL:
    recv_end, send_end = multiprocessing.Pipe(False)
    print(recv_end, send_end)
    p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))       
    print("P = ", p)
    jobs.append(p)
    print("JOBS, len", jobs, len(jobs))                
    pipe_list.append(recv_end)
    print("pipe_list", pipe_list)               
    p.start()

return jobs, pipe_list

Original answer:

<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
  File "y.py", line 104, in <module>
    main()
  File "y.py", line 48, in main
    frames = get_frames(videos)
  File "y.py", line 80, in get_frames
    p.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in
start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in
_Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 326, in
_Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", lin
e 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in
 dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object

Z:\>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in sp
awn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _m
ain
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

It fails before p.start. The instances are created and the structures seems OK:

VIDEOS: [<VideoCapture 000000000D418710>, <VideoCapture 000000000D4186F0>, <Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P =  <Process name='Process-1' parent=8892 initial>
JOBS, len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>

See the module pickle:

https://docs.python.org/3/library/pickle.html

It seems not everything can be "pickled".

What can be pickled and unpickled?

The following types can be pickled:

None, True, and False

integers, floating point numbers, complex numbers

strings, bytes, bytearrays

tuples, lists, sets, and dictionaries containing only picklable objects

functions defined at the top level of a module (using def, not lambda)

built-in functions defined at the top level of a module

classes that are defined at the top level of a module

instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).

Besides it seems there was a bug in opencv causing that. One of the solution given is to turn the multiprocessing off...

Python multiprocess can't pickle opencv videocapture object

https://github.com/MVIG-SJTU/AlphaPose/issues/164

Fang-Haoshu commented on 17 Oct 2018

This bug is due to multi-processing in opencv. --sp disable multi-processing. BTW, can you tell me the version of opencv that you are using?

I guess something about locked memory or something.

A workaround I would try would be to first dump the pixels of the object as plain data or raw something, maybe with a header about the size etc.

Also, in general, for smoother play I think some buffering needs to be added.

BTW, what version is your openCV? Mine is 4.2.0

Upvotes: 5

Related Questions