Reputation: 6229
I have a slice of integers, which are manipulated concurrently:
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
I'm using a buffered channel as semaphore in order to have an upper bound of concurrently running go routines:
sem := make(chan struct{}, 2)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
The code above works pretty well until the last or last two integers are reached, because the program ends before those last go routines are finished.
Question: how do I wait for the buffered channel to drain?
Upvotes: 20
Views: 22944
Reputation: 1
Here is a working example. The for
loop at the end forces the program to wait
until the jobs are done:
package main
import "time"
func w(n int, e chan error) {
// do something
println(n)
time.Sleep(time.Second)
// release semaphore
<-e
}
func main() {
a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
e := make(chan error, 2)
for _, n := range a {
// acquire semaphore
e <- nil
// start long running go routine
go w(n, e)
}
for n := cap(e); n > 0; n-- {
e <- nil
}
}
Upvotes: -1
Reputation: 57
You can wait your "sub-goroutines" with the current goroutine in a for loop
semLimit := 2
sem := make(chan struct{}, semLimit)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
// wait semaphore
for i := 0; i < semLimit; i++ {
wg<-struct{}{}
}
Optionnaly it is also possible to program a minimalistic "semaphored waitgroup" with the economy of import sync
semLimit := 2
// mini semaphored waitgroup
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i++ { wgAdd() } }
for _, i := range ints {
// acquire semaphore
wgAdd()
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
wgDone()
}(i, sem)
}
// wait semaphore
wgWait()
Upvotes: 0
Reputation: 75
Use "worker pool" to process you data. It is cheeper than run goroutine for each int, allocate memory for variables inside it and so on...
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ch := make(chan int)
var wg sync.WaitGroup
// run worker pool
for i := 2; i > 0; i-- {
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
// do something
fmt.Println(id)
}
}()
}
// send ints to workers
for _, i := range ints {
ch <- i
}
close(ch)
wg.Wait()
Upvotes: 5
Reputation: 11
Clearly there is no one waiting for your go-routines to complete. Thus the program ends before the last 2 go-routines are completed. You may use a workgroup to wait for all your go-routines complete before the program ends. This tells it better - https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/
Upvotes: 0
Reputation: 109440
You can't use a semaphore (channel in this case) in that manner. There's no guarantee it won't be empty any point while you are processing values and dispatching more goroutines. That's not a concern in this case specifically since you're dispatching work synchronously, but because there's no race-free way to check a channel's length, there's no primitive to wait for a channel's length to reach 0.
Use a sync.WaitGroup
to wait for all goroutines to complete
sem := make(chan struct{}, 2)
var wg sync.WaitGroup
for _, i := range ints {
wg.Add(1)
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int) {
defer wg.Done()
// do something
// release semaphore
<-sem
}(i)
}
wg.Wait()
Upvotes: 26