Reputation: 366013
I have a bunch of goroutines doing something in a loop. I want to be able to pause all of them, run some arbitrary code, then resume them. The way I attempted to do this is probably not idiomatic (and I'd appreciate a better solution), but I can't understand why it doesn't work.
Stripped down to the essentials (driver code at the bottom):
type looper struct {
pause chan struct{}
paused sync.WaitGroup
resume chan struct{}
}
func (l *looper) loop() {
for {
select {
case <-l.pause:
l.paused.Done()
<-l.resume
default:
dostuff()
}
}
}
func (l *looper) whilePaused(fn func()) {
l.paused.Add(32)
l.resume = make(chan struct{})
close(l.pause)
l.paused.Wait()
fn()
l.pause = make(chan struct{})
close(l.resume)
}
I spin up 32 goroutines all running loop()
, then call whilePaused
100 times in a row, and everything seems to work… but if I run it with -race
, it tells me that there's a race on l.resume
between writing it in whilePaused
(l.resume = make(chan struct{})
) and reading it in loop
(<-l.resume
).
I don't understand why this happens. According to The Go Memory Model, that close(l.pause)
should happen before the <-l.pause
in every loop
goroutine. This should mean the make(chan struct{})
value is visible as the value of l.resume
in all of those loop
goroutines, in the same way the string "hello world"
is visible as the value of a
in the f
goroutine in the docs example.
Some additional information that might be relevant:
If I replace l.resume
with an unsafe.Pointer
and access the chan struct{}
value with atomic.LoadPointer
in loop
and atomic.StorePointer
in whilePaused
, the race goes away. This seems to be providing the exact same acquire-release ordering that the channel is already supposed to provide?
If I add a time.Sleep(10 * time.Microsecond)
between the l.paused.Done()
and <-l.resume
, the program usually deadlocks after calling fn
one or two times.
If I add a fmt.Printf(".")
instead, the program prints 28 .
s, calls the first function, prints another 32 .
s, then hangs (or, occasionally, calls the second function, then prints another 32 .
s and hangs).
Here's the rest of my code, in case you want to run the whole thing:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// looper code from above
var n int64
func dostuff() {
atomic.AddInt64(&n, 1)
}
func main() {
l := &looper{
pause: make(chan struct{}),
}
var init sync.WaitGroup
init.Add(32)
for i := 0; i < 32; i++ {
go func() {
init.Done()
l.loop()
}()
}
init.Wait()
for i := 0; i < 100; i++ {
l.whilePaused(func() { fmt.Printf("%d ", i) })
}
fmt.Printf("\n%d\n", atomic.LoadInt64(&n))
}
Upvotes: 1
Views: 408
Reputation: 92
This is because after the thread does l.paused.Done(), the other thread is able to go around the loop and assign l.resume again
Here is sequence of operations
Looper thread | Pauser thread
------------------------------------
l.paused.Done() |
| l.paused.Wait()
| l.pause = make(chan struct{})
| round the loop
| l.paused.Add(numThreads)
<- l.resume | l.resume = make(chan struct{}) !!!RACE!!
Upvotes: 4