Reputation: 1
I'm trying to implement a pipeline in Go and there was a problem that the program exits the main goroutine before the rest of the goroutines are finished.
Please, help fix this case using wait groups.
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
c1 := make(chan string)
c2 := make(chan string)
go sender(c1)
go removeDuplicates(c1, c2)
go printer(c2)
wg.Wait()
}
func sender(outputStream chan string) {
wg.Add(1)
wg.Done()
for _, v := range []string{"one", "one", "two", "two", "three", "three"} {
outputStream <- v
}
close(outputStream)
}
func removeDuplicates(inputStream, outputStream chan string) {
wg.Add(1)
wg.Done()
temp := ""
for v := range inputStream {
if v != temp {
outputStream <- v
temp = v
}
}
close(outputStream)
}
func printer(inputStream chan string) {
wg.Add(1)
wg.Done()
for v := range inputStream {
fmt.Println(v)
}
}
When I used time.Sleep in this case, the program worked successfully.
Upvotes: 0
Views: 380
Reputation: 34282
The first rules of working with wait groups: don't call Add()
from a goroutine you're going to wait for. It is possible, that Wait()
will be called before Add()
, which is not what you expect. The second rule: call Done()
in the end, not in the beginning. So, the fix would be:
func main() {
c1 := make(chan string)
c2 := make(chan string)
wg.Add(3)
go sender(c1)
go removeDuplicates(c1, c2)
go printer(c2)
wg.Wait()
}
func sender(outputStream chan string) {
defer wg.Done()
for _, v := range []string{"one", "one", "two", "two", "three", "three"} {
outputStream <- v
}
close(outputStream)
}
// etc for other two functions
Upvotes: 2