Reputation: 6231
I'm tying to execute things async with multiple go routines. I pass in the number of "threads" to use to process the files async. The files is an array of Strings to process.
queue := make(chan string)
threadCount := c.Int("threads")
if c.Int("threads") < len(files) {
threadCount = len(files)
}
log.Infof("Starting %i processes", c.Int("threads"))
for i := 0; i < threadCount; i++ {
go renderGoRoutine(queue)
}
for _, f := range files {
queue <- f
}
close(queue)
And the routine itself looks like this:
func renderGoRoutine(queue chan string) {
for file := range queue {
// do some heavy lifting stuff with the file
}
}
This does work fine whenever i use just one thread. As soon as i take more then one it does exit/leave the scope before it is done with all the go routines.
How do I make it process everything?
Previous question: Using a channel for dispatching tasks to go routine
Upvotes: 0
Views: 140
Reputation: 6231
I did forget to wait for all tasks to finish. This can simply be done by waiting for all loops to end. Since close(channel)
does end the for range channel
a simple sync with a channel can be used like so:
sync := make(chan bool)
queue := make(chan string)
threadCount := c.Int("threads")
if c.Int("threads") < len(files) {
threadCount = len(files)
}
log.Infof("Starting %i processes", c.Int("threads"))
for i := 0; i < threadCount; i++ {
go renderGoRoutine(queue)
}
for _, f := range files {
queue <- f
}
close(queue)
for i := 0; i < threadCount; i++ {
<- sync
}
And last but not least write to the channel whenever a iteration is stopped.
func renderGoRoutine(queue chan string) {
for file := range queue { //whatever is done here
}
sync <- true
}
Upvotes: 1
Reputation: 1592
Using WaitGroups is an option.
At the beginning, you add number tasks into WaitGroup and after each task is done decrement counter in the WaitGroup. Wait until all tasks are finished at the end of your code flow.
See the example: https://godoc.org/sync#WaitGroup
Your code will look like this:
queue := make(chan string)
wg := sync.WaitGroup{}
wg.Add(len(files))
threadCount := c.Int("threads")
if c.Int("threads") < len(files) {
threadCount = len(files)
}
log.Infof("Starting %i processes", c.Int("threads"))
for i := 0; i < threadCount; i++ {
go renderGoRoutine(queue)
}
for _, f := range files {
queue <- f
}
close(queue)
wg.Wait()
renderGoRoutine:
func renderGoRoutine(queue chan string) {
for file := range queue {
// do some heavy lifting stuff with the file
// decrement the waitGroup counter
wg.Done()
}
}
Upvotes: 2
Reputation: 5770
You are using the channel to publish work to be done. As soon as the last item is taken from the queue (not finished processing), your program exits.
You could use a channel to write to at the end of renderGoRoutine
to signal the end of processing.
At the top:
sync := make(chan bool)
In renderGoRoutine
at the end (Assuming it is in the same file):
sync <- true
At the bottom:
for f := range sync {
<- sync
}
Now your program waits until the number of files are processed.
Or to have a complete example:
queue := make(chan string)
sync := make(chan bool)
threadCount := c.Int("threads")
if c.Int("threads") < len(files) {
threadCount = len(files)
}
log.Infof("Starting %i processes", c.Int("threads"))
for i := 0; i < threadCount; i++ {
go renderGoRoutine(queue)
}
for _, f := range files {
queue <- f
}
close(queue)
for f := range sync {
<- sync
}
And the routine should be changed like this:
func renderGoRoutine(queue chan string) {
for file := range queue {
// do some heavy lifting stuff with the file
sync <- true
}
}
Upvotes: 1