Wolkenarchitekt
Wolkenarchitekt

Reputation: 21248

How to use Asyncio to stream process data between 3 subprocesses (using pipes) and consume the resulting data

I have 3 scripts that need to be combined in order to process data in a pipeline. The scripts run forever, until the execution is interrupted by the user. This is how they are executed inside a terminal:

script1_producer.sh | script2_processor.sh | script3_processor.sh

script1_producer.sh produces the data to be processed (as an example it just prints incrementing numbers)

i=1
while true; do
  echo $i
  i=$(($i+1))
  sleep 1
done

script2_processor.sh consumes data from Script1 and calculates a new stream of data (multiplying each number*2):

while read -r line
do
  echo "$(($line*2))"
done < "${1:-/dev/stdin}"

script3_processor.sh consumes data from Script2 and calculates a new stream of data (Adding a letter to each number):

while read -r line
do
  echo "A$(($line))"
done < "${1:-/dev/stdin}"

The resulting output when running script1_producer.sh | script2_processor.sh | script3_processor.sh:

A2
A4
A6
...

Now I would like these scripts to be controlled by Python subprocesses using pipes. In the end I need process the output of script3_processor.sh and perform operations for each line. I'm trying to implement this using asyncio though it would be ok not to use asyncio if thats possible.

This is my - very naive attempt process_pipes.py:

import asyncio
import subprocess
import os


async def async_receive():
    p1 = await asyncio.create_subprocess_exec(
        "./script1_producer.sh",
        stdout=subprocess.PIPE,
    )

    p2 = await asyncio.create_subprocess_exec(
        "./script2_processor.sh",
        stdin=p1.stdout,
        stdout=subprocess.PIPE,
    )

    p3 = await asyncio.create_subprocess_exec(
        "./script3_processor.sh",
        stdin=p2.stdout,
        stdout=subprocess.PIPE,
    )

    # Read just one line to test
    data = await p3.stdout.readline()
    print(data)


asyncio.run(async_receive())

Unfortunately, I'm getting the following exception when executing this script:

Traceback (most recent call last):
  File "process_pipes.py", line 28, in <module>
    asyncio.run(async_receive())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "process_pipes.py", line 12, in async_receive
    p2 = await asyncio.create_subprocess_exec(
  File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
    transport = await self._make_subprocess_transport(
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
    transp = _UnixSubprocessTransport(self, protocol, args, shell,
  File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
    self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
    self._proc = subprocess.Popen(
  File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
    p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'

I read some examples on Stackoverflow and elsewhere telling me to handle the pipes differently, but could not get these to work in my scenario.

How can I mimic running script1_producer.sh | script2_processor.sh | script3_processor.sh and process the output of script3 in Python?

Upvotes: 1

Views: 1337

Answers (2)

Gealber
Gealber

Reputation: 492

I found another solution, guiding me by this question:

  1. Connect two processes started with asyncio.subprocess.create_subprocess_exec()

Before that, one thing to remark, is that the scripts has syntax error given that in lines like this one, echo "$(($line*2))", it should be more spaces, something like this echo "$(( $line * 2 ))", bash is a little silly about spaces. Apart from that, all good.

One thing to remember here, is that pipes have two ends, one to read and another to write. So in the first process, it would be something like in this sketch:

  • Write end(WE)
  • Read end(RE)
p0 ---> | pipe 1 | ---> p1
       WE        RE

You should use a pipe from os, as describe in the question referred above. This part would be something like this:

    read1, write1 = os.pipe()
    p0 = await asyncio.create_subprocess_exec(
        "./script1_producer.sh",
        stdout=write1
    )

the stdout would be the WE of the pipe, while for the p1 we have

| pipe 1 | ---> p1 -------> | pipe 2|
WE       RE=stdin  stdout=WE   

the stdin would be the RE of the first pipe, and the stdout the WE of the second pipe, something like this:

    read2, write2 = os.pipe()
    p2 = await asyncio.create_subprocess_exec(
        "./script2_processor.sh",
        stdin=read1,
        stdout=write2,
    )

And in the third process

| pipe 2 | ---> p3 -------> | asyncio PIPE|
WE       RE=stdin  stdout=WE   

Joining all together we have

import asyncio
import subprocess
import os


async def async_receive():
    read1, write1 = os.pipe()
    p0 = await asyncio.create_subprocess_exec(
        "./script1_producer.sh",
        stdout=write1
    )

    read2, write2 = os.pipe()
    p2 = await asyncio.create_subprocess_exec(
        "./script2_processor.sh",
        stdin=read1,
        stdout=write2,
    )

    p3 = await asyncio.create_subprocess_exec(
        "./script3_processor.sh",
        stdin=read2,
        stdout=asyncio.subprocess.PIPE,
    )

    # Read just one line to test
    while True:
        data = await p3.stdout.readline()
        data = data.decode('ascii').rstrip()
        print(data)
        print("Sleeping 1 sec...")
        await asyncio.sleep(1)


asyncio.run(async_receive())

In this way you still could use asyncio.

Upvotes: 3

Wolkenarchitekt
Wolkenarchitekt

Reputation: 21248

This is how to solve the problem without asyncio - just use Popen with shell=True and put the pipes into the command:

import subprocess
import os


def receive():
    p = subprocess.Popen(
        "./script1_producer.sh "
        "| ./script2_processor.sh "
        "| ./script3_processor.sh",
        stdout=subprocess.PIPE, shell=True)

    while True:
        line = p.stdout.readline()
        if line:
            print(line.decode().strip())

if __name__ == '__main__':
    receive()

Upvotes: 1

Related Questions