Yura
Yura

Reputation: 1174

How to stop all goroutines that wait for sync.Cond?

I wrote a Queue class

type Queue struct {
    data     []interface{}
    cond     *sync.Cond
}

func New() Queue {
    return Queue{
        data:     []interface{}{},
        cond:     sync.NewCond(&sync.Mutex{}),
        chanStop: make(chan interface{}),
    }
}

func (q *Queue) Push(val interface{}) {
    q.cond.L.Lock()

    q.data = append(q.data, val)

    q.cond.Signal()
    q.cond.L.Unlock()
}

func (q *Queue) Pop() (interface{}, bool) {

    q.cond.L.Lock()
    for len(q.data) == 0 {
        q.cond.Wait()
    }

    retVal := q.data[0]

    q.data = q.data[1:]

    q.cond.L.Unlock()

    return retVal, true
}

func (q *Queue) Close() {

}

If the queue is empty Pop() callers will be blocked. Is there any way to stop waiting all routines that are blocked with Pop() by any Cond calls?

Of course I can do something like

type Queue struct {
    data     []interface{}
    cond     *sync.Cond
    chanStop chan interface{}
}


func (q *Queue) Pop() (interface{}, bool) {

    var retVal interface{}
    retFlag := false

    select {
    case <-q.chanStop:

    case <-func() <-chan interface{} {
        out := make(chan interface{})

        go func() {
            defer close(out)

            q.cond.L.Lock()
            for len(q.data) == 0 {
                q.cond.Wait()
            }

            retVal = q.data[0]
            retFlag = true

            q.data = q.data[1:]

            q.cond.L.Unlock()
        }()
        return out
    }():
    }

    return retVal, retFlag
}

func (q *Queue) Close() {
    close(q.chanStop)
}

But maybe there's some way to stop waiting without all this select verbosity?

UPDATE:

Actually it might be done so:

package queue

import "sync"

type Queue struct {
    data     []interface{}
    cond     *sync.Cond
    stop     bool
}

func New() Queue {
    return Queue{
        data:     []interface{}{},
        cond:     sync.NewCond(&sync.Mutex{}),
        stop: false,
    }
}

func (q *Queue) Push(val interface{}) {
    q.cond.L.Lock()

    q.data = append(q.data, val)

    q.cond.Signal()
    q.cond.L.Unlock()
}

func (q *Queue) Pop() (interface{}, bool) {
    q.cond.L.Lock()
    for len(q.data) == 0 && !q.stop {
        q.cond.Wait()
    }

    if q.stop {
        q.cond.L.Unlock()
        return nil, false
    }

    retVal := q.data[0]

    q.data = q.data[1:]

    q.cond.L.Unlock()

    return retVal, true
}

func (q *Queue) Close() {
    q.cond.L.Lock()

    q.stop = true

    q.cond.Broadcast()
    q.cond.L.Unlock()
}

And yes, sync.Cond is sooo weird.

Upvotes: 3

Views: 984

Answers (1)

icza
icza

Reputation: 418435

You could wake all clients waiting in Pop() with Cond.Broadcast(), but then you would also have to handle if q.data is empty and there's nothing to return.

Moreover, if clients keep calling Pop() after the queue has been closed, you would also need to check if the queue had been closed before and not enter the waiting state but return early.

Generally sync.Cond is under-documented, it is incompatible with other Go synchronization patterns (e.g. select), and many don't consider it to be a useful synchronization primitive in Go, and potentially it will be removed in Go 2, see details.

Channels may be used instead of sync.Cond, e.g. closing the channel corresponds to Cond.Broadcast(), sending a value on the channel corresponds to Cond.Signal().

Back to your example. The simplest concurrency-safe queue is a buffered channel itself. Push operation is a send on the channel, pop operation is a receive from the channel. Channels are safe for concurrent use.

One thing that a buffered channel "doesn't know" is that it has a fixed buffer size, and once created, the buffer size cannot be changed. Still, I think it's a small price to pay to allocate a big buffer prior and not worry about anything later. Sends on a channel whose buffer is full would not panic "just" block until someone receives from the channel.

Upvotes: 4

Related Questions