Reputation: 11
I'm performing a bunch of operations over SSH on a remote machine and I'm streaming its stdout and stderr and then consuming it to by a writer, which writes to the local stdout and stderr, along with byte buffers.
Just before the writer consumes it, I want to perform a series of string manipulations on it and then write to my screen and buffer. Up to this point, it all works fine and dandy.
My issue is now it's not a stream anymore, it hangs and then outputs the whole glob in one chunk. I want it to be real time, so I put channels in my go routines but with no improvement. Below are my functions, let me know if you can spot a reason why, or possibly a better way of achieving this.
// sending
func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, readers chan io.Reader) {
if filters != nil {
// filters exist
// read first 8 bytes
res := readPipe(8, pipe)
// get each line from the resulting streamed output
for _, str := range strings.Split(res, "\n") {
if str != "" {
out := lineFilterAndReplace(str, filters, replaceFilters)
// instantiate an io.Reader obj from the given string
outReader := strings.NewReader(out)
readers <- outReader
}
}
} else {
// filters dont exist
if len(replaceFilters) > 0 {
res := readPipe(8, pipe)
for _, str := range strings.Split(res, "\n") {
if str != "" {
out := lineReplace(str, replaceFilters)
// instantiate an io.Reader obj from the given string
outReader := strings.NewReader(out)
readers <- outReader
}
}
} else {
readers <- pipe
}
}
}
// recieving
outReaders := make(chan io.Reader)
go handleStdStream(outFilters, replaceFilters, stdoutIn, outReaders)
go func() {
for {
pipe := <-outReaders
_, errStdout = io.Copy(outWriter, pipe)
}
// _, errStdout = io.Copy(outWriter, stdoutIn)
}()
Upvotes: 1
Views: 559
Reputation: 1734
I don't think you need channels or goroutines to accomplish this. The Writer
and Reader
interfaces are already streaming; you sip bytes from a Reader
continuously until you hit EOF or an error and you hand off bytes to a Writer
continuously until you're done or you get an error. On its own, processing a stream does not require any concurrency, so doing this sequentially in a single goroutine is quite appropriate.
You shouldn't ignore error returns. If a function or method returns an error value, you need to check it. In the case of I/O, you usually need to stop reading from a Reader
when it returns an error and you usually need to stop writing to a Writer
when it returns an error. In the case of a Reader
you also have to check for the special "error" value io.EOF
.
I think using Scanner
from the bufio
package is better than trying to do your own buffering/splitting. By default, Scanner
splits input on newlines (Unix-style LF or DOS-style CRLF). It also gets rid of the need to check for io.EOF
, provided you only interact with the Reader
through the Scanner
.
Consider the following version of handleStdStream
:
func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, w io.Writer) error {
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
str := scanner.Text()
if str == "" {
continue
}
out := ""
if len(filters) != 0 {
out = lineFilterAndReplace(str, filters, replaceFilters)
} else {
out = lineReplace(str, replaceFilters)
}
if _, err := w.Write([]byte(out)); err != nil {
return err
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}
You would use it like this:
err := handleStdStream(filters, replaceFilters, pipe, outWriter)
if err != nil {
// do something, like printing the error to a log or stderr
}
Upvotes: 1