Cmag
Cmag

Reputation: 15750

Wait for concurrent workers to finish before exiting

The following code has an obvious problem: the program will exit before all the work is finished by the workers.

Goroutines of the workers are launched before the sender starts sending data, which must remain. Starting these goroutines from the sender function is not an option. It would be easy to do so, however, need to learn a more complex synchronization technique.

What would be the correct way to wait for the workers to finish?

Have tried closing the worker1CH and worker2CH channels, as well as adding dedicated sync.WaitGroups to each worker.

package main

import (
    "log"
    "math/rand"
    "sync"
)

func main() {

    worker1CH := make(chan int, 1)
    worker2CH := make(chan int, 1)

    // worker for even numbers
    go func(in chan int) {
        for i := range in {
            log.Print(i)
        }
    }(worker1CH)

    // worker for odd numbers
    go func(in chan int) {
        for i := range in {
            log.Print(i)
        }
    }(worker2CH)

    // sender which sends even numbers to worker1CH, and odd numbers to worker2CH
    var wg sync.WaitGroup
    wg.Add(1)
    go func(wg *sync.WaitGroup, evenChan chan int, oddChan chan int) {
        defer wg.Done()

        data := rand.Perm(10)
        for _, i := range data {
            switch i%2 {
            case 0:
                evenChan <- i
            default:
                oddChan <- i
            }
        }
    }(&wg, worker1CH, worker2CH)
    wg.Wait()

}

Upvotes: 0

Views: 1649

Answers (3)

panchaldaxesh
panchaldaxesh

Reputation: 1

Since your sender has fix size so it will be exiton its own and you can just close the channel for reader and wait

package main

import (
    "log"
    "math/rand"
    "sync"
)

func reader(in chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := range in {
        log.Print(i)
    }
}

func main() {
    var wg sync.WaitGroup

    worker1CH := make(chan int, 1)
    worker2CH := make(chan int, 1)

    wg.Add(1)
    // worker for even numbers
    go reader(worker1CH, &wg)

    wg.Add(1)
    // worker for odd numbers
    go reader(worker2CH, &wg)

    // sender which sends even numbers to worker1CH, and odd numbers to worker2CH
    sender(worker1CH, worker2CH)

    close(worker2CH)
    close(worker1CH)
    wg.Wait()

}

func sender(evenChan chan int, oddChan chan int) {
    data := rand.Perm(10)
    for _, i := range data {
        switch i % 2 {
        case 0:
            evenChan <- i
        default:
            oddChan <- i
        }
    }
}

playground link https://play.golang.org/p/JJ9ngCHUvbS

Upvotes: 0

user13631587
user13631587

Reputation:

Wait for the two receiving goroutines to complete using a wait group. Use one wait group to wait for both goroutines.

Close the channels after sending all values so that the loops in the receiving goroutines exit.

There's no need to wait for the sending goroutine. The grouting competes all of it's work before the other coroutines complete.

worker1CH := make(chan int, 1)
worker2CH := make(chan int, 1)

var wg sync.WaitGroup
wg.Add(2)  // <-- wait for the two receiving goroutines.

// worker for even numbers
go func(wg *sync.WaitGroup, in chan int) {
    defer wg.Done() // <--- add this line
    for i := range in {
        log.Print(i)
    }
}(&wg, worker1CH)

// worker for odd numbers
go func(wg *sync.WaitGroup, in chan int) {
    defer wg.Done() <-- add this line
    for i := range in {
        log.Print(i)
    }
}(&wg, worker2CH)

// sender which sends even numbers to worker1CH, and odd numbers to worker2CH
go func(evenChan chan int, oddChan chan int) {

    defer close(evenChan) // <-- close channel so that receiver exits loop
    defer close(oddChan)  // <-- ditto

    data := rand.Perm(10)
    for _, i := range data {
        switch i % 2 {
        case 0:
            evenChan <- i
        default:
            oddChan <- i
        }
    }
}(worker1CH, worker2CH)

wg.Wait()

Run the example on the Go Playground.

Upvotes: 2

Cmag
Cmag

Reputation: 15750

Have Been able to create worker1Done and worker2Done channels, then waiting for the work to finish.

Also had to add close(evenChan) and close(oddChan) to the sender function to avoid the fatal error: all goroutines are asleep - deadlock! error

package main

import (
    "log"
    "math/rand"
    "sync"
)

func main() {

    worker1CH := make(chan int, 1)
    worker2CH := make(chan int, 1)

    worker1Done := make(chan bool)
    worker2Done := make(chan bool)

    // worker for even numbers
    go func(in chan int, done chan bool) {
        for i := range in {
            log.Print(i)
        }
        done <- true
    }(worker1CH, worker1Done)

    // worker for odd numbers
    go func(in chan int, done chan bool) {
        for i := range in {
            log.Print(i)
        }
        done <- true
    }(worker2CH, worker2Done)

    // sender which sends even numbers to worker1CH, and odd numbers to worker2CH
    var wg sync.WaitGroup
    wg.Add(1)
    go func(wg *sync.WaitGroup, evenChan chan int, oddChan chan int) {
        defer wg.Done()

        data := rand.Perm(10)
        for _, i := range data {
            switch i%2 {
            case 0:
                evenChan <- i
            default:
                oddChan <- i
            }
        }

        close(evenChan)
        close(oddChan)

    }(&wg, worker1CH, worker2CH)
    wg.Wait()

    <- worker1Done
    <- worker2Done

}

Upvotes: 0

Related Questions