Reputation: 786
I have a function which uses a goroutine to read data from network. This go routine has 2 selects
stop := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func((){
defer wg.Done()
ticker := time.NewTicker(restartDuration)
defer ticker.Stop()
for {
listener, err := net.ListenUDP()
if err != nil {
return err)
}
L: // Labelled for loop
for {
select {
case <-stop:
return
default:
n, _, err := listener.ReadFrom(inboundRTPPacket)
if err != nil {
break L
}
}
}
if err = listener.Close(); err != nil {
logrus.Warn("Could not close udp listener")
}
select {
case <-stop:
return
case <-ticker.C:
}
}
}()
A close(stop)
call is received and acted upon by the outer select if the inner select is removed but does not work with the the inner select.
I am not sure what am I missing here,
Upvotes: 1
Views: 79
Reputation: 22117
Your code is read-blocking here:
n, _, err := listener.ReadFrom(inboundRTPPacket)
Even if close(stop)
is performed later - it will not unblock the code above - because net.Listener
is not aware of your cancelation mechanism.
context.Context is designed for this specific purpose - pass in a context
to any potentially blocking call. If the call blocks and the context
is canceled, then the call will unblock.
io.Reader
's read operations do not come with context support. It's a very difficult problem:
A quick & dirty solution for you, for demonstration purposes:
// watches for cancelation:
go func() {
<-stop
listener.Close()
}()
// ...
// this will unblock upon `close(stop)` with an error
// as the connection is closed
n, _, err := listener.ReadFrom(inboundRTPPacket)
Note: you must close(stop)
after each connection-handler to avoid a goroutine leak.
Since you are using a *net.UDPConn
, a more robust technique would be to leverage the SetReadDeadline
method to cancel blocked read calls e.g.
go func() {
<-stop // watch for cancelations
var err error
// ensure 'listener' has the `SetReadDeadline` method
// (which *net.UDPConn does)
if rd, ok := listener.(interface {
SetReadDeadline(time.Time) error
}); ok {
err = rd.SetReadDeadline(time.Now()) // unblock any blocked reads
} else {
err = listener.Close() // no deadline method, so default to just closing
}
if err != nil {
log.Println(err)
}
}()
again be sure to close(stop)
to avoid goroutine leaks.
Upvotes: 1