Finlay Weber
Finlay Weber

Reputation: 4143

How to use channels to gather response from various goroutines

I am new to Golang and I have a task that I have implemented using WaitGroup, and Mutex which I would like to convert to use Channels instead.

A very brief description of the task is this: spurn as many go routines as needed to processes a result and in the main go routine wait and collect all the results.

The implementation I have using WaitGroup, and Mutex is as follows:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func process(input int, wg *sync.WaitGroup, result *[]int, lock *sync.Mutex) *[]int {
    defer wg.Done()
    defer lock.Unlock()

    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    lock.Lock()
    *result = append(*result, input * 10)

    return result
}

func main() {

    var wg sync.WaitGroup
    var result []int
    var lock sync.Mutex
    for i := range []int{1,2,3,4,5} {
        wg.Add(1)
        go process(i, &wg, &result, &lock)
    }
}

How do I replace the memory synchronization with the usage of Mutex to one that uses Channels?

My main problem is I am not sure how to determine the final go routine that is processing the final task and hence have that one be the one to close the channel. The idea is that by closing the channel the main go routine can loop over the channel, retrieve the results and when it sees the channel has been closed, it moves on.

It could also be that the approach to close the channel is the wrong one in this scenario, hence why I am asking here.

How would a more experienced go programmer solve this problem using channels?

Upvotes: 3

Views: 5352

Answers (3)

Mohammad Rajabloo
Mohammad Rajabloo

Reputation: 2913

I change your code to use the channel. there are many other ways to use the channel.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func process(input int, out chan<- int) {
    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    out <- input * 10
}

func main() {
    var result []int
    resultChan := make(chan int)
    items := []int{1, 2, 3, 4, 5}

    for _, v := range items {
        go process(v, resultChan)
    }

    for i := 0; i < len(items); i++ {
        res, _ := <-resultChan
        result = append(result, res)
    }

    close(resultChan)
    fmt.Println(result)
}

Update: (comment's answer)

if items count is unknown you need to signal the main to finish. otherwise "deadlock", you can create a channel to signal the main function to finish. also, you can use sync.waiteGroup.

for panic in Goroutine, you can use defer and recover to handle errors . and you can create an error channel ore you can use x/sync/errgroup.

There are so many solutions. and it depends on your problem. so there is no specific way to use goroutine, channel, and...

Upvotes: 1

sc_ray
sc_ray

Reputation: 8043

Here's a sample snippet where I am using a slice of channels instead of waitgroups to perform a fork-join:

package main

import (
    "fmt"
    "os"
)

type cStruct struct {
    resultChan chan int
    errChan    chan error
}

func process(i int) (v int, err error) {
    v = i
    return
}

func spawn(i int) cStruct {
    r := make(chan int)
    e := make(chan error)
    go func(i int) {
        defer close(r)
        defer close(e)
        v, err := process(i)
        if err != nil {
            e <- err
            return
        }
        r <- v
        return
    }(i)
    return cStruct{
        r,
        e,
    }
}

func main() {
    //have a slice of channelStruct
    var cStructs []cStruct
    nums := []int{1, 2, 3, 4, 5}
    for _, v := range nums {
        cStruct := spawn(v)
        cStructs = append(cStructs, cStruct)
    }
    //All the routines have been spawned, now iterate over the slice:
    var results []int
    for _, c := range cStructs {
        rChan, errChan := c.resultChan, c.errChan
        select {
        case r := <-rChan:
            {
                results = append(results, r)
            }
        case err := <-errChan:
            {
                if err != nil {
                    os.Exit(1)
                    return
                }
            }
        }

    }
    //All the work should be done by now, iterating over the results
    for _, result := range results {
        fmt.Println("Aggregated result:", result)
    }
}

Upvotes: 2

chash
chash

Reputation: 4423

Here's a solution using WaitGroup instead of waiting for a fixed number of results.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func process(input int, wg *sync.WaitGroup, resultChan chan<- int) {
    defer wg.Done()

    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)

    resultChan <- input * 10
}

func main() {
    var wg sync.WaitGroup

    resultChan := make(chan int)

    for i := range []int{1,2,3,4,5} {
        wg.Add(1)
        go process(i, &wg, resultChan)
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    var result []int
    for r := range resultChan {
        result = append(result, r)
    }

    fmt.Println(result)
}

Upvotes: 5

Related Questions