Reputation: 5149
I have a server which upon receiving a request needs to use goroutines to read messages from different streams, send them to the parent goroutine and the parent goroutine aggregate the messages and send them to the client. So it will be like this:
Client --> API --> handler --> worker1
|--> worker2
|--> worker3
I use different channels for communicating between all these goroutines and workers can write to the aggregated channel and handler (parent goroutine) can receive it and send it back to the Client so everything works, ALMOST :)
The problem is my workers don't receive the message that was sent by the handler on the channel and the worker's goroutines never stop but I cannot find why they don't receive the messages.
Since the code is huge I just put the relevant parts of the code.
Note:
The loglistenmgm
which is the channel that I need to receive its message in workers has a buffer the same size as the number of workers BUT if I remove the buffer the info log after writing true
to it will never get printed (which means there is no goroutine listening) and I will only see this in the logs and logs hangs there:
INFO[0010] Request.Context().Done triggered
INFO[0010] **Sending true event to loglistenmgm: 0
My handler:
func handler() {
logStream := make(chan string)
done := make(chan bool, 5)
loglistenmgm := make(chan bool, numberOfPods)
errorStream := make(chan error)
for _, pod := range pods {
go getOnePodLogs(logStream, errorStream, loglistenmgm, done)
}
go func() {
for {
select {
case <-c.Request.Context().Done():
log.Info("Request.Context().Done triggered")
for i := 0; i < numberOfPods; i++ {
log.Info("**Sending true event to loglistenmgm: " + strconv.Itoa(i))
loglistenmgm <- true
log.Info("**After sending true event to loglistenmgm") // This gets printed so the true has been sent to the channel
}
done <- true
return
case err := <-errorStream:
c.JSON(http.StatusInternalServerError, map[string]string{
"message": "Internal Server Error: " + err.Error(),
})
for i := 0; i < numberOfPods; i++ {
loglistenmgm <- true
}
done <- true
return
}
}
}()
isStreaming := c.Stream(func(w io.Writer) bool {
for {
select {
case <-done:
c.SSEvent("end", "end") // This also works properly and it can print the "stream closed" which happens after this
return false
case msg := <-logStream:
c.Render(-1, sse.Event{
Event: "message",
Data: msg,
})
return true
}
}
})
if !isStreaming {
log.Info("stream closed")
}
}
My workers:
func getOnePodLogs(logStream chan string, errorStream chan error, loglistenmgm chan bool, done chan bool) {
stream, err := podLogRequest.Stream()
defer stream.Close()
if err != nil {
log.Error(err.Error())
errorStream <- err
return
}
for {
select {
case <-loglistenmgm:
log.Info(pod + "stop listenning to logs") // this log line never get triggered
return
default:
buf := make([]byte, 1000)
numBytes, err := stream.Read(buf)
if numBytes == 0 {
log.Info(pod + ": numBytes == 0 --> End of log")
done <- true
return
}
if err == io.EOF {
log.Info("io.EOF")
return
}
if err != nil {
log.Error("Error getting stream.Read(buf)")
log.Error(err)
return
}
message := string(buf[:numBytes])
logStream <- message // This works and I can receive the message on handler and it can pass it to the client
}
}
}
Upvotes: 0
Views: 424
Reputation: 5149
Thanks to https://stackoverflow.com/users/32880/jimb which pointed out that the problem is stream.Read
and that blocks the select
I found the problem and I solved it.
The stream.Read
is an io.ReadCloser
with low frequency so when I send the true
event to the loglistenmgm
channel the worker is waiting to receive a new message on stream.Read
and cannot read from loglistenmgm
channel. I confirmed this by sending some message to the stream.Read
minutes after the handler was closed and then the workers were able to read from the channel and exit.
I solved the problem by changing my program and instead of creating the stream
in the workers, I created them in the handler and pass them to the workers and when I am done with the handler, I close all the streams and this triggers the numBytes == 0
and closes the worker.
Upvotes: 0