Reputation: 237
I am working on an exercise in using channels to implement a queue. Specifically, I am trying to use the size of a channel to limit the number of simultaneous goroutines. To wit, I have written the following code:
package main
import "fmt"
import "time"
import "math/rand"
func runTask (t string, ch *chan bool) {
start := time.Now()
fmt.Println("starting task", t)
time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
fmt.Println("done running task", t, "in", time.Since(start))
<- *ch
}
func main() {
numWorkers := 3
files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
activeWorkers := make(chan bool, numWorkers)
for _, f := range files {
activeWorkers <- true
fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
go runTask(f, &activeWorkers)
}
select{}
}
Right now, the code crashes with:
throw: all goroutines are asleep - deadlock!
My expectation was that the call to select would block forever and let the goroutines terminate without a deadlock.
So I have a two-fold question: why isn't select blocking forever and, short of throwing in a time.Sleep() call after the for loop, how can I avoid deadlocks?
Cheers,
-mtw
Upvotes: 5
Views: 1696
Reputation: 19347
Firstly, you don't need to pass a pointer to the channel; channels, like maps and others,
are references, meaning the underlying data isn't copied, only a pointer to the actual data. If you need a pointer to a chan
itself, you'll know when that time comes.
The crash occurs because the program gets into a state where every goroutine is blocked. This should be impossible; if every goroutine is blocked, then no possible process could come and wake up another goroutine (and your program would consequently be hung).
The primary goroutine winds up in a select {}
—not waiting for anyone, just hanging. Once the last runTask
goroutine finishes, there's only the primary goroutine left, and it's waiting on no-one.
You'll need to add some way to know when every goroutine has finished; perhaps another channel can receive finish events.
This is a bit ugly, but might be some inspiration.
package main
import "fmt"
import "time"
import "math/rand"
func runTask(t string, ch chan bool, finishedCh chan bool) {
start := time.Now()
fmt.Println("starting task", t)
time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
fmt.Println("done running task", t, "in", time.Since(start))
<-ch
finishedCh <- true
}
func main() {
numWorkers := 3
files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
activeWorkers := make(chan bool, numWorkers)
finishedWorkers := make(chan bool)
done := make(chan bool)
go func() {
remaining := len(files)
for remaining > 0 {
<-finishedWorkers
remaining -= 1
}
done <- true
}()
for _, f := range files {
activeWorkers <- true
fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
go runTask(f, activeWorkers, finishedWorkers)
}
<-done
}
Upvotes: 4
Reputation: 2197
tux21b has already posted a more idiomatic solution, but I would like to answer your question a different way. select{} does block forever, yes. A deadlock occurs when all goroutines are blocked. If all your other goroutines finish, then you only have the blocked main goroutine left, which is a deadlock.
Normally, you want to do something in your main goroutine after all the others have finished, either by using their results, or just cleaning up, and for that you'd do what tux21b suggested. If you really just want main to finish and leave the rest of the goroutines to do their job, put defer runtime.Goexit()
at the top of your main function. This will cause it to exit without exiting to the program.
Upvotes: 2
Reputation: 94769
Arlen Cuss has already written a good answer. I just want to suggest another design for your work queue. Instead of limiting the number of entries your channel can buffer, you can also just spawn a limited number of worker goroutines which feels more natural imho. Something like that:
package main
import "fmt"
import "time"
import "math/rand"
func runTask(t string) string {
start := time.Now()
fmt.Println("starting task", t)
time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
fmt.Println("done running task", t, "in", time.Since(start))
return t
}
func worker(in chan string, out chan string) {
for t := range in {
out <- runTask(t)
}
}
func main() {
numWorkers := 3
// spawn workers
in, out := make(chan string), make(chan string)
for i := 0; i < numWorkers; i++ {
go worker(in, out)
}
files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
// schedule tasks
go func() {
for _, f := range files {
in <- f
}
}()
// get results
for _ = range files {
<-out
}
}
You can also use a sync.WaitGroup if you just want to wait until all tasks have been executed, but using an out
channel has the advantage that you can aggregate the results later. For example if each tasks returns the number of words in that file, the final loop might be used to sum up all individual word counts.
Upvotes: 6