Akshat Goel
Akshat Goel

Reputation: 786

Receiving data on a channel with a blocking default

I have a function which uses a goroutine to read data from network. This go routine has 2 selects

  1. Outer select, with a timer, to restart reading if the reader fails
  2. Inner select that actually reads the data And all of this is wrapped in a waitGroup
stop := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)

go func((){
defer wg.Done()
        ticker := time.NewTicker(restartDuration)
        defer ticker.Stop()
        for {
            listener, err := net.ListenUDP()
             if err != nil {
                return err)
            }

          L: // Labelled for loop 

                for {
                    select {
                    case <-stop:
                        return
                    default:
                        n, _, err := listener.ReadFrom(inboundRTPPacket)
                        if err != nil {
                            break L
                        }

                    }
                }
                if err = listener.Close(); err != nil {
                    logrus.Warn("Could not close udp listener")
                }


            select {
            case <-stop:
                return
            case <-ticker.C:
            }
        }
}()

A close(stop) call is received and acted upon by the outer select if the inner select is removed but does not work with the the inner select.

I am not sure what am I missing here,

Upvotes: 1

Views: 79

Answers (1)

colm.anseo
colm.anseo

Reputation: 22117

Your code is read-blocking here:

n, _, err := listener.ReadFrom(inboundRTPPacket)

Even if close(stop) is performed later - it will not unblock the code above - because net.Listener is not aware of your cancelation mechanism.

context.Context is designed for this specific purpose - pass in a context to any potentially blocking call. If the call blocks and the context is canceled, then the call will unblock.

io.Reader's read operations do not come with context support. It's a very difficult problem:

A quick & dirty solution for you, for demonstration purposes:

// watches for cancelation:
go func() {
    <-stop
    listener.Close()
}()

// ...

// this will unblock upon `close(stop)` with an error
// as the connection is closed
n, _, err := listener.ReadFrom(inboundRTPPacket) 

Note: you must close(stop) after each connection-handler to avoid a goroutine leak.


Since you are using a *net.UDPConn, a more robust technique would be to leverage the SetReadDeadline method to cancel blocked read calls e.g.

go func() {

    <-stop // watch for cancelations

    var err error

    // ensure 'listener' has the `SetReadDeadline` method
    // (which *net.UDPConn does)
    if rd, ok := listener.(interface {
        SetReadDeadline(time.Time) error
    }); ok {
        err = rd.SetReadDeadline(time.Now()) // unblock any blocked reads

    } else {
        err = listener.Close() // no deadline method, so default to just closing
    }

    if err != nil {
        log.Println(err)
    }
}()

again be sure to close(stop) to avoid goroutine leaks.

Upvotes: 1

Related Questions