Andre Rogers
Andre Rogers

Reputation: 11

Streaming Stdout and Stderr over SSH, manipulate the stream and then print to local Stdout, and Stderr

I'm performing a bunch of operations over SSH on a remote machine and I'm streaming its stdout and stderr and then consuming it to by a writer, which writes to the local stdout and stderr, along with byte buffers.

Just before the writer consumes it, I want to perform a series of string manipulations on it and then write to my screen and buffer. Up to this point, it all works fine and dandy.

My issue is now it's not a stream anymore, it hangs and then outputs the whole glob in one chunk. I want it to be real time, so I put channels in my go routines but with no improvement. Below are my functions, let me know if you can spot a reason why, or possibly a better way of achieving this.

// sending

func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, readers chan io.Reader) {
        if filters != nil {
                // filters exist
                // read first 8 bytes
                res := readPipe(8, pipe)

                // get each line from the resulting streamed output
                for _, str := range strings.Split(res, "\n") {
                        if str != "" {
                                out := lineFilterAndReplace(str, filters, replaceFilters)

                                // instantiate an io.Reader obj from the given string
                                outReader := strings.NewReader(out)

                                readers <- outReader
                        }
                }
        } else {
                // filters dont exist
                if len(replaceFilters) > 0 {
                        res := readPipe(8, pipe)

                        for _, str := range strings.Split(res, "\n") {
                                if str != "" {
                                        out := lineReplace(str, replaceFilters)

                                        // instantiate an io.Reader obj from the given string
                                        outReader := strings.NewReader(out)

                                        readers <- outReader
                                }
                        }
                } else {
                        readers <- pipe
                }
        }
}

// recieving

    outReaders := make(chan io.Reader)

    go handleStdStream(outFilters, replaceFilters, stdoutIn, outReaders)

    go func() {
            for {
                    pipe := <-outReaders

                    _, errStdout = io.Copy(outWriter, pipe)
            }

            // _, errStdout = io.Copy(outWriter, stdoutIn)
    }()

Upvotes: 1

Views: 559

Answers (1)

kbolino
kbolino

Reputation: 1734

I don't think you need channels or goroutines to accomplish this. The Writer and Reader interfaces are already streaming; you sip bytes from a Reader continuously until you hit EOF or an error and you hand off bytes to a Writer continuously until you're done or you get an error. On its own, processing a stream does not require any concurrency, so doing this sequentially in a single goroutine is quite appropriate.

You shouldn't ignore error returns. If a function or method returns an error value, you need to check it. In the case of I/O, you usually need to stop reading from a Reader when it returns an error and you usually need to stop writing to a Writer when it returns an error. In the case of a Reader you also have to check for the special "error" value io.EOF.

I think using Scanner from the bufio package is better than trying to do your own buffering/splitting. By default, Scanner splits input on newlines (Unix-style LF or DOS-style CRLF). It also gets rid of the need to check for io.EOF, provided you only interact with the Reader through the Scanner.

Consider the following version of handleStdStream:

func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, w io.Writer) error {
    scanner := bufio.NewScanner(pipe)
    for scanner.Scan() {
        str := scanner.Text()
        if str == "" {
            continue
        }
        out := ""
        if len(filters) != 0 {
            out = lineFilterAndReplace(str, filters, replaceFilters)
        } else {
            out = lineReplace(str, replaceFilters)
        }
        if _, err := w.Write([]byte(out)); err != nil {
            return err
        }
    }
    if err := scanner.Err(); err != nil {
        return err
    }
    return nil
}

You would use it like this:

err := handleStdStream(filters, replaceFilters, pipe, outWriter)
if err != nil {
    // do something, like printing the error to a log or stderr
}

Upvotes: 1

Related Questions