abarnert
abarnert

Reputation: 366013

Race pausing a group of goroutines

I have a bunch of goroutines doing something in a loop. I want to be able to pause all of them, run some arbitrary code, then resume them. The way I attempted to do this is probably not idiomatic (and I'd appreciate a better solution), but I can't understand why it doesn't work.

Stripped down to the essentials (driver code at the bottom):

type looper struct {
    pause  chan struct{}
    paused sync.WaitGroup
    resume chan struct{}
}

func (l *looper) loop() {
    for {
        select {
        case <-l.pause:
            l.paused.Done()
            <-l.resume
        default:
            dostuff()
        }
    }
}

func (l *looper) whilePaused(fn func()) {
    l.paused.Add(32)
    l.resume = make(chan struct{})
    close(l.pause)
    l.paused.Wait()
    fn()
    l.pause = make(chan struct{})
    close(l.resume)
}

I spin up 32 goroutines all running loop(), then call whilePaused 100 times in a row, and everything seems to work… but if I run it with -race, it tells me that there's a race on l.resume between writing it in whilePaused (l.resume = make(chan struct{})) and reading it in loop (<-l.resume).

I don't understand why this happens. According to The Go Memory Model, that close(l.pause) should happen before the <-l.pause in every loop goroutine. This should mean the make(chan struct{}) value is visible as the value of l.resume in all of those loop goroutines, in the same way the string "hello world" is visible as the value of a in the f goroutine in the docs example.


Some additional information that might be relevant:


Here's the rest of my code, in case you want to run the whole thing:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// looper code from above

var n int64    
func dostuff() {
    atomic.AddInt64(&n, 1)
}

func main() {
    l := &looper{
        pause: make(chan struct{}),
    }
    var init sync.WaitGroup
    init.Add(32)
    for i := 0; i < 32; i++ {
        go func() {
            init.Done()
            l.loop()
        }()
    }
    init.Wait()
    for i := 0; i < 100; i++ {
        l.whilePaused(func() { fmt.Printf("%d ", i) })
    }
    fmt.Printf("\n%d\n", atomic.LoadInt64(&n))
}

Upvotes: 1

Views: 408

Answers (1)

Nikunj Yadav
Nikunj Yadav

Reputation: 92

This is because after the thread does l.paused.Done(), the other thread is able to go around the loop and assign l.resume again

Here is sequence of operations

Looper thread    |    Pauser thread
------------------------------------
l.paused.Done()  |   
                 |   l.paused.Wait()
                 |   l.pause = make(chan struct{})
                 |   round the loop
                 |   l.paused.Add(numThreads)
<- l.resume      |   l.resume = make(chan struct{})   !!!RACE!!

Upvotes: 4

Related Questions