Norman Edance
Norman Edance

Reputation: 392

Non-blocking realtime read from multiple shell subprocesses (Python)

I'm building real time multiple videostream monitoring using ffmpeg and subrocess. I currently have the following code, inspired by "Async and await with subprocesses" post.

The problem is that after a certain period of time the output stops printing and the processes go into zombie mode. I guess that this problem is related to the overload of PIPE or deadlock. Help needed.

"""Async and await example using subprocesses

Note:
    Requires Python 3.6.
"""

import os
import sys
import time
import platform
import asyncio

async def run_command_shell(command):
    """Run command in subprocess (shell)

    Note:
        This can be used if you wish to execute e.g. "copy"
        on Windows, which can only be executed in the shell.
    """
    # Create subprocess
    process = await asyncio.create_subprocess_shell(
        command,
        stderr=asyncio.subprocess.PIPE)

    # Status
    print('Started:', command, '(pid = ' + str(process.pid) + ')')

    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()

    # Progress
    if process.returncode == 0:
        print('Done:', command, '(pid = ' + str(process.pid) + ')')
    else:
        print('Failed:', command, '(pid = ' + str(process.pid) + ')')

    # Result
    result = stderr.decode().strip()

    # Real time print
    print(result)

    # Return stdout
    return result


def make_chunks(l, n):
    """Yield successive n-sized chunks from l.

    Note:
        Taken from https://stackoverflow.com/a/312464
    """
    if sys.version_info.major == 2:
        for i in xrange(0, len(l), n):
            yield l[i:i + n]
    else:
        # Assume Python 3
        for i in range(0, len(l), n):
            yield l[i:i + n]


def run_asyncio_commands(tasks, max_concurrent_tasks=0):
    """Run tasks asynchronously using asyncio and return results

    If max_concurrent_tasks are set to 0, no limit is applied.

    Note:
        By default, Windows uses SelectorEventLoop, which does not support
        subprocesses. Therefore ProactorEventLoop is used on Windows.
        https://docs.python.org/3/library/asyncio-eventloops.html#windows
    """

    all_results = []

    if max_concurrent_tasks == 0:
        chunks = [tasks]
    else:
        chunks = make_chunks(l=tasks, n=max_concurrent_tasks)

    for tasks_in_chunk in chunks:
        if platform.system() == 'Windows':
            loop = asyncio.ProactorEventLoop()
            asyncio.set_event_loop(loop)
        else:
            loop = asyncio.get_event_loop()

        commands = asyncio.gather(*tasks_in_chunk)  # Unpack list using *
        results = loop.run_until_complete(commands)
        all_results += results
        loop.close()
    return all_results


if __name__ == '__main__':

    start = time.time()

    if platform.system() == 'Windows':
        # Commands to be executed on Windows
        commands = [
            ['hostname']
        ]
    else:
        # Commands to be executed on Unix
        commands = [
            ['du', '-sh', '/var/tmp'],
            ['hostname'],
        ]
    cmds = [["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx  -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"],
            ["ffmpeg -y -i udp://xxx.xx.xx.xxx:xxxx -f null -"]]

    tasks = []
    for command in cmds:
        tasks.append(run_command_shell(*command))


    # # Shell execution example
    # tasks = [run_command_shell('copy c:/somefile d:/new_file')]

    # # List comprehension example
    # tasks = [
    #     run_command(*command, get_project_path(project))
    #     for project in accessible_projects(all_projects)
    # ]

    results = run_asyncio_commands(tasks, max_concurrent_tasks=20)  # At most 20 parallel tasks
    print('Results:', results)

    end = time.time()
    rounded_end = ('{0:.4f}'.format(round(end-start,4)))
    print('Script ran in about', str(rounded_end), 'seconds')

Related: Non-blocking read from multiple subprocesses (Python)

Upvotes: 1

Views: 928

Answers (1)

Norman Edance
Norman Edance

Reputation: 392

It turned out that the problem is probably not related to code optimization through multithreading, asyncio, etc.

The reason may be server limitations, such as maximum number of open files / file descriptors (FD), firewall, other config files.

If you stumbled upon a similar problem:


Install htop

Htop is an interactive real time process monitoring application for Linux/Unix like systems and also a handy alternative to top command, which is default process monitoring tool that comes with pre-installed on all Linux operating systems.

This may be useful for clarifying the reasons.


Test single ffmpeg command

As jfs said, I need Minimal, Complete, and Verifiable example . So we start with a very minimal: test one process.

ffmpeg -y -i udp://224.10.0.123:1234  -f null -

In my case it turned out that any multicast will hang in 2:10 - 2:20 minutes. Process alive but in zombie mode. This is very strange, because a couple of days ago everything worked perfectly.


Test another soft (VLC's multicat)

The latest official version of Multicat is numbered 2.2, and is available here.

Get it and don't forget, that biTStream needs to be installed at build-time.

Check the stream using the command to record video from the stream:

timeout 10 multicat -uU @224.10.0.123:1234 test.ts

In my case, the same thing happened on the 2nd minute. The command does not stop executing, but the file ceased to be recorded.


Check Maximum number of open files / file descriptors more info

Use the following command command to display maximum number of open file descriptors:

cat /proc/sys/fs/file-max

To see the hard and soft values, issue the command as follows:

ulimit -Hn
ulimit -Sn

At some point in the execution of one of my python scripts, I saw a similar error, but the increase in this parameter did not help me.


Summary

So the problem is not related to the parallel execution of my scripts. Verification on another virtual machine was successful. I contacted the person who set up this virtual machine and explained to him that something broke during the last couple of days, suggested that the problem is in the firewall. He said that he did not touch anything. But after this call everything began to work perfectly. (I'm almost sure that he broke it) :D

GL everyone!

Upvotes: 1

Related Questions