Kiril
Kiril

Reputation: 6229

How to wait until buffered channel (semaphore) is empty?

I have a slice of integers, which are manipulated concurrently:

ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

I'm using a buffered channel as semaphore in order to have an upper bound of concurrently running go routines:

sem := make(chan struct{}, 2)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

The code above works pretty well until the last or last two integers are reached, because the program ends before those last go routines are finished.

Question: how do I wait for the buffered channel to drain?

Upvotes: 20

Views: 22944

Answers (5)

Zombo
Zombo

Reputation: 1

Here is a working example. The for loop at the end forces the program to wait until the jobs are done:

package main
import "time"

func w(n int, e chan error) {
   // do something
   println(n)
   time.Sleep(time.Second)
   // release semaphore
   <-e
}

func main() {
   a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
   e := make(chan error, 2)
   for _, n := range a {
      // acquire semaphore
      e <- nil
      // start long running go routine
      go w(n, e)
   }
   for n := cap(e); n > 0; n-- {
      e <- nil
   }
}

Upvotes: -1

DBZ7
DBZ7

Reputation: 57

You can wait your "sub-goroutines" with the current goroutine in a for loop

semLimit := 2
sem := make(chan struct{}, semLimit)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

// wait semaphore
for i := 0; i < semLimit; i++ { 
  wg<-struct{}{} 
}

Optionnaly it is also possible to program a minimalistic "semaphored waitgroup" with the economy of import sync

semLimit := 2
// mini semaphored waitgroup 
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i++ { wgAdd() } }

for _, i := range ints {
  // acquire semaphore
  wgAdd()

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    wgDone()
  }(i, sem)
}

// wait semaphore
wgWait()

Upvotes: 0

Justin S
Justin S

Reputation: 75

Use "worker pool" to process you data. It is cheeper than run goroutine for each int, allocate memory for variables inside it and so on...

ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

ch := make(chan int)

var wg sync.WaitGroup

// run worker pool
for i := 2; i > 0; i-- {
    wg.Add(1)

    go func() {
        defer wg.Done()

        for id := range ch {
            // do something
            fmt.Println(id)
        }
    }()
}

// send ints to workers
for _, i := range ints {
    ch <- i
}

close(ch)

wg.Wait()

Upvotes: 5

d81hack
d81hack

Reputation: 11

Clearly there is no one waiting for your go-routines to complete. Thus the program ends before the last 2 go-routines are completed. You may use a workgroup to wait for all your go-routines complete before the program ends. This tells it better - https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

Upvotes: 0

Mr_Pink
Mr_Pink

Reputation: 109440

You can't use a semaphore (channel in this case) in that manner. There's no guarantee it won't be empty any point while you are processing values and dispatching more goroutines. That's not a concern in this case specifically since you're dispatching work synchronously, but because there's no race-free way to check a channel's length, there's no primitive to wait for a channel's length to reach 0.

Use a sync.WaitGroup to wait for all goroutines to complete

sem := make(chan struct{}, 2)

var wg sync.WaitGroup

for _, i := range ints {
    wg.Add(1)
    // acquire semaphore
    sem <- struct{}{}
    // start long running go routine
    go func(id int) {
        defer wg.Done()
        // do something
        // release semaphore
        <-sem
    }(i)
}

wg.Wait()

Upvotes: 26

Related Questions