AVarf
AVarf

Reputation: 5149

Child goroutine doesn't receive message from the parent channel

I have a server which upon receiving a request needs to use goroutines to read messages from different streams, send them to the parent goroutine and the parent goroutine aggregate the messages and send them to the client. So it will be like this:

Client --> API --> handler --> worker1
                          |--> worker2
                          |--> worker3

I use different channels for communicating between all these goroutines and workers can write to the aggregated channel and handler (parent goroutine) can receive it and send it back to the Client so everything works, ALMOST :)

The problem is my workers don't receive the message that was sent by the handler on the channel and the worker's goroutines never stop but I cannot find why they don't receive the messages.

Since the code is huge I just put the relevant parts of the code.

Note: The loglistenmgm which is the channel that I need to receive its message in workers has a buffer the same size as the number of workers BUT if I remove the buffer the info log after writing true to it will never get printed (which means there is no goroutine listening) and I will only see this in the logs and logs hangs there:

INFO[0010] Request.Context().Done triggered             
INFO[0010] **Sending true event to loglistenmgm: 0

My handler:

func handler() {
        logStream := make(chan string)
        done := make(chan bool, 5)
        loglistenmgm := make(chan bool, numberOfPods)
        errorStream := make(chan error)

        for _, pod := range pods {
            go getOnePodLogs(logStream, errorStream, loglistenmgm, done)
        }

        go func() {
            for {
                select {
                case <-c.Request.Context().Done():
                    log.Info("Request.Context().Done triggered")
                    for i := 0; i < numberOfPods; i++ {
                        log.Info("**Sending true event to loglistenmgm: " + strconv.Itoa(i))
                        loglistenmgm <- true
                        log.Info("**After sending true event to loglistenmgm") // This gets printed so the true has been sent to the channel
                    }
                    done <- true
                    return
                case err := <-errorStream:
                    c.JSON(http.StatusInternalServerError, map[string]string{
                        "message": "Internal Server Error: " + err.Error(),
                    })
                    for i := 0; i < numberOfPods; i++ {
                        loglistenmgm <- true
                    }
                    done <- true
                    return
                }
            }
        }()

        isStreaming := c.Stream(func(w io.Writer) bool {
            for {
                select {
                case <-done:
                    c.SSEvent("end", "end") // This also works properly and it can print the "stream closed" which happens after this
                    return false
                case msg := <-logStream:
                    c.Render(-1, sse.Event{
                        Event: "message",
                        Data:  msg,
                    })
                    return true
                }
            }
        })
        if !isStreaming {
            log.Info("stream closed")
        }
}

My workers:

func getOnePodLogs(logStream chan string, errorStream chan error, loglistenmgm chan bool, done chan bool) {
    stream, err := podLogRequest.Stream()
    defer stream.Close()

    if err != nil {
        log.Error(err.Error())
        errorStream <- err
        return
    }

    for {
        select {
        case <-loglistenmgm:
            log.Info(pod + "stop listenning to logs") // this log line never get triggered
            return
        default:
            buf := make([]byte, 1000)
            numBytes, err := stream.Read(buf)
            if numBytes == 0 {
                log.Info(pod + ": numBytes == 0 --> End of log")
                done <- true
                return
            }
            if err == io.EOF {
                log.Info("io.EOF")
                return
            }
            if err != nil {
                log.Error("Error getting stream.Read(buf)")
                log.Error(err)
                return
            }
            message := string(buf[:numBytes])
            logStream <- message // This works and I can receive the message on handler and it can pass it to the client
        }
    }

}

Upvotes: 0

Views: 424

Answers (1)

AVarf
AVarf

Reputation: 5149

Thanks to https://stackoverflow.com/users/32880/jimb which pointed out that the problem is stream.Read and that blocks the select I found the problem and I solved it.

The stream.Read is an io.ReadCloser with low frequency so when I send the true event to the loglistenmgm channel the worker is waiting to receive a new message on stream.Read and cannot read from loglistenmgm channel. I confirmed this by sending some message to the stream.Read minutes after the handler was closed and then the workers were able to read from the channel and exit.

I solved the problem by changing my program and instead of creating the stream in the workers, I created them in the handler and pass them to the workers and when I am done with the handler, I close all the streams and this triggers the numBytes == 0 and closes the worker.

Upvotes: 0

Related Questions