Reputation: 33
I have implemented a worker pool to submit my jobs to the python script.
func NewWorker(index int, workerConfig config.AppConfig, logger log.PrefixedLogger) error {
var worker entities.Worker
worker.ID = index
worker.Config = workerConfig
strCommand := workerConfig.RideModelScriptPath
command := exec.Command(strCommand)
stdIn, err := command.StdinPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stdin pipe from command", err.Error())
return err
}
worker.Stdin = stdIn
stdout, err := command.StdoutPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stdout pipe from command", err.Error())
return err
}
worker.StdOutReader = bufio.NewReaderSize(stdout, workerConfig.MaxRequestSize)
stderr, err := command.StderrPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stderror pipe from command", err.Error())
return err
}
worker.StdError = stderr
err = command.Start()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error starting command", err.Error())
return err
}
go processWorkerPool(&worker, ReqChan, logger)
return err
}
When the shared channel receives jobs it is consumed and sent to the python script.
func processWorkerPool(worker *entities.Worker, ReqChannel chan entities.ReqMessage, logger log.PrefixedLogger) {
for request := range ReqChannel {
bufferLatency.Observe(float64(time.Since(request.SentTime).Nanoseconds()/1e6), map[string]string{"name": "buffer", "error": "false"})
logger.Info("worker.processWorkerPool", request.Request)
startTime := time.Now()
//Send Request to Worker
_, err := io.WriteString(worker.Stdin, request.Request)
if err != nil {
scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "true"})
log.ErrorContext(context.Background(), log.WithPrefix("worker.processWorkerPool", err))
return
}
//Get response from Worker
result := CopyOutput(logger, worker.StdOutReader)
scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "false"})
request.ResponseChannel <- result
}
}
To read the results from stdout of the python script I use the following helper function
func CopyOutput(logger log.PrefixedLogger, r io.Reader) string {
scanner := bufio.NewScanner(r)
result := ""
for scanner.Scan() {
output := scanner.Text()
switch {
case strings.Contains(output, "ERROR"):
errorMsg := strings.SplitAfter(output, "ERROR: ")[1]
err := errors.New(errorMsg)
logger.Error("worker.CopyOutput", "ERROR LOG: ", err.Error())
return err.Error()
case strings.Contains(output, "OUTPUT"):
result = strings.SplitAfter(output, "OUTPUT: ")[1]
logger.Debug("worker.copyOutput", "OUTPUT LOG: ", result)
return result
default:
logger.Debug("worker.copyOutput", "DEBUG LOG: ", output)
}
}
return result
}
On the python end my script look like this
#!/usr/bin/python3
import sys
import json
from threading import Thread
from vrpsolver.ride_model import rides_model
from preprocessor.config_loader import Config
# Load Configs
configs = Config('/opt/pool-processor/configs/configs.yaml')
while True:
# input = json.loads(sys.argv[1])
# model = sys.argv[2]
# file = sys.argv[3]
threads = []
try:
inputDataStream = sys.stdin.readline()
inputDataStream = inputDataStream.strip()
data = inputDataStream.split(' ')
model = data[1]
except (Exception) as ex:
sys.stdout.write('ERROR: Error Occured while reading stdin: {}\n'.format(str(ex)))
sys.stdout.flush()
continue
try:
input = json.loads(data[0])
except (Exception, IOError) as ex:
sys.stdout.write('ERROR: Error Occured while parsing data to json: {}\n'.format(str(ex)))
continue
try:
result = rides_model(input, configs)
sys.stdout.write('OUTPUT: {}\n'.format(json.dumps(result)))
sys.stdout.flush()
except (Exception, IOError) as ex:
sys.stdout.write('ERROR: Error Occured while processing: {}\n'.format(str(ex)))
sys.stdout.flush()
continue
When I run the programme after some time I'm getting
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:76
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:83
from the following lines
_, err := io.WriteString(worker.Stdin, request.Request)
result := CopyOutput(logger, worker.StdOutReader)
I am stuck on this for a while now and any input on this is appreciated. My guess is that after some time the python script is crashing and as a result I am getting this error. I am not sure why that crashing error is not catched from the exception.
Upvotes: 1
Views: 1429
Reputation: 51850
The basic answer on this error is : for some reason, your python process has closed its STDIN (it probably has exited for some reason), check why it exits too early.
Some elements on why it's hard for you to see what your python process does :
sys.stdout
,stdout
is caught and processed by your go program (and a side effect is: it does not get printed to the console),To make debugging easier, I would advise you to have your python script also write its output in a log file.
The first three issues I see are :
Try to not redirect stderr (you should see stderr being printed on the console), or at least add an extra goroutine to drain and print its content somewhere (on your go process stderr, in a log file ...)
e.g :
go func() {
io.Copy(os.Stderr, worker.StdErrReader)
}()
bufio.Scanner
over the Stdout pipeWhen you run bufio.NewScanner(...)
, a new buffered reader with a new buffer is created. If you discard it and create new Scanner
, the previous buffer gets discarded, and you don't know how many bytes have been read from the underlying io.Reader
(some may have been buffered ...).
At the very least, you should instanciate your bufio.NewScanner()
only once (in processWorkerPool()
), and repeatedly call scanner.Scan()
on that single *bufio.Scanner
instance, so that the same buffer gets used.
Keep a way to access commmand.ProcessState
, and check whether your external command has completed.
Upvotes: 1