Reputation: 159
I'm running a Golang TCP server that receives a connection, spawns a goroutine to handle that connection, then spawns a new goroutine to read from the connection and write data to a channel. There's also a simple function to read data from the channel.
The idea is that if the client closes the connection, the ReadFromConn goroutine will receive an error on reading and then return. The defer code would write to the done channel and close both the done and queue channels. The consumer code would stop processing if the done channel has data and return whatever result it was at that moment.
The other case is that the consumer can decide that it has enough data to make a decision and return. This would lead the execution back to the Handle function. When that function returns, the defer on the Handle function would close the connection, causing the ReadFromConn goroutine to receive an error on reading from the connection and close all the pending channels.
This works great, but I'm doing some load testing, and noticed that the memory usage of the Golang server after the test is done is not decreasing like all the other parts of the application do when the load stops. I took some of the IDs and checked the logs. I see that sometimes (not all the time) the ERROR READING FROM CONN message is shown, but there is no READ_FROM_CONN DEFER log, so I think the defer is never being called. As a result, the ProcessQueue hangs reading from the channel because it is never closed, and the CLOSING... log from the Handle function is missing too.
At least, that's what I think is happening, and I believe it's the reason why the memory consumption never drops when the load test ends because the code is still running some goroutines reading from channels that should be closed by the defer code on ReadFromConn. This behavior is not predictable; it doesn't happen with all connections, so I don't know what could be wrong.
Here is a simplified version of my Golang server:
package main
import (
"os"
"net"
"fmt"
"io"
)
type CustomStruct struct {
Type string
Stop bool
}
func main() {
// Creates server
server, err := net.Listen("tcp", "0.0.0.0:80")
if err != nil {
fmt.Println("failed to bind listener to socket", err)
}
defer server.Close()
fmt.Println("Listening new connections V2")
// Starts reading from the server
for {
conn, err := server.Accept()
if err != nil {
fmt.Println("failed to accept new connection:", err)
continue
}
go Handle(conn)
}
}
func Handle(conn net.Conn) {
defer conn.Close()
id := "some uuid for each conn"
// Creates channels
queue := make(chan []byte, 512)
done := make(chan bool)
// Starts reading from the server
go ReadFromConn(id, conn, queue, done)
result := ProcessQueue(id, queue, done)
fmt.Println(id, "CLOSING...")
// Do stuffs with result...
fmt.Println(id, result)
}
func ReadFromConn(
id string,
conn io.Reader,
queue chan []byte,
done chan bool,
) {
defer func() {
done <- true
close(queue)
close(done)
fmt.Println(id, "READ_FROM_CONN DEFER")
}()
tmp := make([]byte, 256)
for {
_, err := conn.Read(tmp)
if err != nil {
fmt.Println(id, "ERROR READING FROM CONN " + err.Error())
return
}
if (tmp[0] == 0x00) {
return
}
queue <- tmp
}
}
func ProcessQueue(
id string,
queue chan []byte,
done chan bool,
) CustomStruct {
defer fmt.Println(id, "GET_TRANSCRIPTION_RESULT ENDED")
fmt.Println(id, "GET_TRANSCRIPTION_RESULT STARTED")
result := CustomStruct{
Type: "transcription",
Stop: false,
}
for {
select {
case <-done:
fmt.Println(id, "DONE DETECTED")
return result
default:
fmt.Println(id, "DEFAULT")
payload, open := <-queue
if open == false {
fmt.Println(id, "QUEUE IS CLOSED")
return result
} else {
fmt.Println(id, "QUEUE IS OPEN")
}
// ... Do stuffs with payload, if certain condition is met, of the result of processing payload, return
if (payload[0] == 0x01) {
return result
}
}
}
return result
}
Upvotes: 1
Views: 1429
Reputation: 23684
Not exactly sure what the issue is, but controlling the lifespan of the reader could be achieved a lot more cleanly using a context object. The context avoids having to manage the channel objects as closely, and provides a clean way to report errors from the goroutine if needed using context.Cause(ctx)
.
Example setup could look like this:
ctx, cancel := context.WithCancelCause(context.Background())
queue := make(chan []byte, 512)
go ReadFromConn(id, conn, queue, ctx, cancel)
result := ProcessQueue(id, queue, ctx, cancel)
The defer call could be to place an error in the context cause, which causes the ctx.Done
method to now return a channel with a value present. This method is also repeatable, meaning multiple goroutines can all receive a value instead of just one when using the simple channel approach.
When sending to the queue, you can select on ctx.Done
as well to prevent waiting forever on a queue that is no longer being read from. Similarly, you can do that for reading from the queue as well.
func ReadFromConn(...) {
defer cancel(fmt.Errorf("reader defer"))
...
for {
...
select {
case queue <- tmp:
case <-ctx.Done():
return
}
}
}
func ProcessQueue(...) {
defer cancel(fmt.Errorf("queue defer"))
...
for {
select {
case <-ctx.Done():
...
case payload := <-queue:
...
}
}
Upvotes: 1