mster3313
mster3313

Reputation: 33

Broken Pipe Error when writing to stdin of a python application from a go application

I have implemented a worker pool to submit my jobs to the python script.

func NewWorker(index int, workerConfig config.AppConfig, logger log.PrefixedLogger) error {
    var worker entities.Worker

    worker.ID = index
    worker.Config = workerConfig
    strCommand := workerConfig.RideModelScriptPath
    command := exec.Command(strCommand)

    stdIn, err := command.StdinPipe()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error resolving stdin pipe from command", err.Error())
        return err
    }
    worker.Stdin = stdIn

    stdout, err := command.StdoutPipe()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error resolving stdout pipe from command", err.Error())
        return err
    }
    worker.StdOutReader = bufio.NewReaderSize(stdout, workerConfig.MaxRequestSize)

    stderr, err := command.StderrPipe()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error resolving stderror pipe from command", err.Error())
        return err
    }
    worker.StdError = stderr

    err = command.Start()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error starting command", err.Error())
        return err
    }

    go processWorkerPool(&worker, ReqChan, logger)
    return err
}

When the shared channel receives jobs it is consumed and sent to the python script.

func processWorkerPool(worker *entities.Worker, ReqChannel chan entities.ReqMessage, logger log.PrefixedLogger) {

    for request := range ReqChannel {
        bufferLatency.Observe(float64(time.Since(request.SentTime).Nanoseconds()/1e6), map[string]string{"name": "buffer", "error": "false"})

        logger.Info("worker.processWorkerPool", request.Request)

        startTime := time.Now()

        //Send Request to Worker
        _, err := io.WriteString(worker.Stdin, request.Request)
        if err != nil {
            scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "true"})
            log.ErrorContext(context.Background(), log.WithPrefix("worker.processWorkerPool", err))
            return
        }

        //Get response from Worker
        result := CopyOutput(logger, worker.StdOutReader)

        scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "false"})
        request.ResponseChannel <- result
    }
}

To read the results from stdout of the python script I use the following helper function

func CopyOutput(logger log.PrefixedLogger, r io.Reader) string {
    scanner := bufio.NewScanner(r)
    result := ""

    for scanner.Scan() {
        output := scanner.Text()

        switch {
        case strings.Contains(output, "ERROR"):
            errorMsg := strings.SplitAfter(output, "ERROR: ")[1]
            err := errors.New(errorMsg)
            logger.Error("worker.CopyOutput", "ERROR LOG: ", err.Error())
            return err.Error()
        case strings.Contains(output, "OUTPUT"):
            result = strings.SplitAfter(output, "OUTPUT: ")[1]
            logger.Debug("worker.copyOutput", "OUTPUT LOG: ", result)
            return result
        default:
            logger.Debug("worker.copyOutput", "DEBUG LOG: ", output)
        }
    }
    return result
}

On the python end my script look like this

#!/usr/bin/python3
import sys
import json
from threading import Thread

from vrpsolver.ride_model import rides_model
from preprocessor.config_loader import Config


# Load Configs
configs = Config('/opt/pool-processor/configs/configs.yaml')

while True:
    
    # input = json.loads(sys.argv[1])
    # model = sys.argv[2]
    # file = sys.argv[3]
    
    threads = []
    try:
        inputDataStream = sys.stdin.readline()
        inputDataStream = inputDataStream.strip()
        data = inputDataStream.split(' ')
        model = data[1]
    except (Exception) as ex:
        sys.stdout.write('ERROR: Error Occured while reading stdin: {}\n'.format(str(ex)))
        sys.stdout.flush()
        continue
    
    try:
        input = json.loads(data[0])
    except (Exception, IOError) as ex:
        sys.stdout.write('ERROR: Error Occured while parsing data to json: {}\n'.format(str(ex)))
        continue

    try:
        result = rides_model(input, configs)
        sys.stdout.write('OUTPUT: {}\n'.format(json.dumps(result)))
        sys.stdout.flush()
    except (Exception, IOError) as ex:
        sys.stdout.write('ERROR: Error Occured while processing: {}\n'.format(str(ex)))
        sys.stdout.flush()
        continue

When I run the programme after some time I'm getting

write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:76
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:83

from the following lines

_, err := io.WriteString(worker.Stdin, request.Request)
result := CopyOutput(logger, worker.StdOutReader)

I am stuck on this for a while now and any input on this is appreciated. My guess is that after some time the python script is crashing and as a result I am getting this error. I am not sure why that crashing error is not catched from the exception.

Upvotes: 1

Views: 1429

Answers (1)

LeGEC
LeGEC

Reputation: 51850

The basic answer on this error is : for some reason, your python process has closed its STDIN (it probably has exited for some reason), check why it exits too early.


Some elements on why it's hard for you to see what your python process does :

  • its main "actitivity log" is on sys.stdout,
  • stdout is caught and processed by your go program (and a side effect is: it does not get printed to the console),
  • there are some issues in how you handle the subprocess output in go.

To make debugging easier, I would advise you to have your python script also write its output in a log file.


The first three issues I see are :

  • you set a StderrPipe on your python process, but it is never used, so STDERR is completely silenced

Try to not redirect stderr (you should see stderr being printed on the console), or at least add an extra goroutine to drain and print its content somewhere (on your go process stderr, in a log file ...)

e.g :

go func() {
   io.Copy(os.Stderr, worker.StdErrReader)
}()
  • to read the child process output as lines of text, you repeatedly create a new bufio.Scanner over the Stdout pipe

When you run bufio.NewScanner(...), a new buffered reader with a new buffer is created. If you discard it and create new Scanner, the previous buffer gets discarded, and you don't know how many bytes have been read from the underlying io.Reader (some may have been buffered ...).

At the very least, you should instanciate your bufio.NewScanner() only once (in processWorkerPool()), and repeatedly call scanner.Scan() on that single *bufio.Scanner instance, so that the same buffer gets used.

  • you should somehow monitor the state of the running process

Keep a way to access commmand.ProcessState, and check whether your external command has completed.

Upvotes: 1

Related Questions