Reputation: 4143
I am new to Golang and I have a task that I have implemented using WaitGroup
, and Mutex
which I would like to convert to use Channels
instead.
A very brief description of the task is this: spurn as many go routines as needed to processes a result and in the main go routine wait and collect all the results.
The implementation I have using WaitGroup
, and Mutex
is as follows:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func process(input int, wg *sync.WaitGroup, result *[]int, lock *sync.Mutex) *[]int {
defer wg.Done()
defer lock.Unlock()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
lock.Lock()
*result = append(*result, input * 10)
return result
}
func main() {
var wg sync.WaitGroup
var result []int
var lock sync.Mutex
for i := range []int{1,2,3,4,5} {
wg.Add(1)
go process(i, &wg, &result, &lock)
}
}
How do I replace the memory synchronization with the usage of Mutex
to one that uses Channels
?
My main problem is I am not sure how to determine the final go routine that is processing the final task and hence have that one be the one to close the channel
. The idea is that by closing the channel
the main go routine can loop over the channel
, retrieve the results and when it sees the channel
has been closed, it moves on.
It could also be that the approach to close the channel is the wrong one in this scenario, hence why I am asking here.
How would a more experienced go programmer solve this problem using channels
?
Upvotes: 3
Views: 5352
Reputation: 2913
I change your code to use the channel. there are many other ways to use the channel.
package main
import (
"fmt"
"math/rand"
"time"
)
func process(input int, out chan<- int) {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
out <- input * 10
}
func main() {
var result []int
resultChan := make(chan int)
items := []int{1, 2, 3, 4, 5}
for _, v := range items {
go process(v, resultChan)
}
for i := 0; i < len(items); i++ {
res, _ := <-resultChan
result = append(result, res)
}
close(resultChan)
fmt.Println(result)
}
Update: (comment's answer)
if items count is unknown you need to signal the main to finish. otherwise "deadlock", you can create a channel to signal the main function to finish. also, you can use sync.waiteGroup
.
for panic in Goroutine, you can use defer and recover to handle errors . and you can create an error channel ore you can use x/sync/errgroup
.
There are so many solutions. and it depends on your problem. so there is no specific way to use goroutine, channel, and...
Upvotes: 1
Reputation: 8043
Here's a sample snippet where I am using a slice of channels instead of waitgroups to perform a fork-join:
package main
import (
"fmt"
"os"
)
type cStruct struct {
resultChan chan int
errChan chan error
}
func process(i int) (v int, err error) {
v = i
return
}
func spawn(i int) cStruct {
r := make(chan int)
e := make(chan error)
go func(i int) {
defer close(r)
defer close(e)
v, err := process(i)
if err != nil {
e <- err
return
}
r <- v
return
}(i)
return cStruct{
r,
e,
}
}
func main() {
//have a slice of channelStruct
var cStructs []cStruct
nums := []int{1, 2, 3, 4, 5}
for _, v := range nums {
cStruct := spawn(v)
cStructs = append(cStructs, cStruct)
}
//All the routines have been spawned, now iterate over the slice:
var results []int
for _, c := range cStructs {
rChan, errChan := c.resultChan, c.errChan
select {
case r := <-rChan:
{
results = append(results, r)
}
case err := <-errChan:
{
if err != nil {
os.Exit(1)
return
}
}
}
}
//All the work should be done by now, iterating over the results
for _, result := range results {
fmt.Println("Aggregated result:", result)
}
}
Upvotes: 2
Reputation: 4423
Here's a solution using WaitGroup
instead of waiting for a fixed number of results.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func process(input int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
resultChan <- input * 10
}
func main() {
var wg sync.WaitGroup
resultChan := make(chan int)
for i := range []int{1,2,3,4,5} {
wg.Add(1)
go process(i, &wg, resultChan)
}
go func() {
wg.Wait()
close(resultChan)
}()
var result []int
for r := range resultChan {
result = append(result, r)
}
fmt.Println(result)
}
Upvotes: 5