sv_dev
sv_dev

Reputation: 1

How to use sync.WaitGroup in pipeline

I'm trying to implement a pipeline in Go and there was a problem that the program exits the main goroutine before the rest of the goroutines are finished.

Please, help fix this case using wait groups.

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    go sender(c1)
    go removeDuplicates(c1, c2)
    go printer(c2)

    wg.Wait()
}

func sender(outputStream chan string) {
    wg.Add(1)
    wg.Done()

    for _, v := range []string{"one", "one", "two", "two", "three", "three"} {
        outputStream <- v
    }

    close(outputStream)
}

func removeDuplicates(inputStream, outputStream chan string) {
    wg.Add(1)
    wg.Done()

    temp := ""

    for v := range inputStream {
        if v != temp {
            outputStream <- v
            temp = v
        }
    }

    close(outputStream)
}

func printer(inputStream chan string) {
    wg.Add(1)
    wg.Done()

    for v := range inputStream {
        fmt.Println(v)
    }
}

When I used time.Sleep in this case, the program worked successfully.

Upvotes: 0

Views: 380

Answers (1)

bereal
bereal

Reputation: 34282

The first rules of working with wait groups: don't call Add() from a goroutine you're going to wait for. It is possible, that Wait() will be called before Add(), which is not what you expect. The second rule: call Done() in the end, not in the beginning. So, the fix would be:

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    wg.Add(3)

    go sender(c1)
    go removeDuplicates(c1, c2)
    go printer(c2)

    wg.Wait()
}

func sender(outputStream chan string) {
    defer wg.Done()

    for _, v := range []string{"one", "one", "two", "two", "three", "three"} {
        outputStream <- v
    }

    close(outputStream)
}

// etc for other two functions

Upvotes: 2

Related Questions