bemeyer
bemeyer

Reputation: 6231

go routine end before done

I'm tying to execute things async with multiple go routines. I pass in the number of "threads" to use to process the files async. The files is an array of Strings to process.

queue := make(chan string)

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
} 

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}

for _, f := range files {
    queue <- f
}
close(queue)

And the routine itself looks like this:

func renderGoRoutine(queue chan string) {
    for file := range queue { 
        // do some heavy lifting stuff with the file
    }
}

This does work fine whenever i use just one thread. As soon as i take more then one it does exit/leave the scope before it is done with all the go routines.

How do I make it process everything?

Previous question: Using a channel for dispatching tasks to go routine

Upvotes: 0

Views: 140

Answers (3)

bemeyer
bemeyer

Reputation: 6231

I did forget to wait for all tasks to finish. This can simply be done by waiting for all loops to end. Since close(channel) does end the for range channel a simple sync with a channel can be used like so:

sync := make(chan bool)
queue := make(chan string)

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
} 

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}

for _, f := range files {
    queue <- f
}
close(queue)

for i := 0; i < threadCount; i++ {
    <- sync
}

And last but not least write to the channel whenever a iteration is stopped.

func renderGoRoutine(queue chan string) {
    for file := range queue { //whatever is done here
    }
    sync <- true
}

Upvotes: 1

maksadbek
maksadbek

Reputation: 1592

Using WaitGroups is an option.

At the beginning, you add number tasks into WaitGroup and after each task is done decrement counter in the WaitGroup. Wait until all tasks are finished at the end of your code flow.

See the example: https://godoc.org/sync#WaitGroup

Your code will look like this:

queue := make(chan string)

wg := sync.WaitGroup{}
wg.Add(len(files))

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
}

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}


for _, f := range files {
    queue <- f
}

close(queue)
wg.Wait()

renderGoRoutine:

func renderGoRoutine(queue chan string) {
    for file := range queue {
        // do some heavy lifting stuff with the file
        // decrement the waitGroup counter
        wg.Done()
    }
}

Upvotes: 2

mbuechmann
mbuechmann

Reputation: 5770

You are using the channel to publish work to be done. As soon as the last item is taken from the queue (not finished processing), your program exits.

You could use a channel to write to at the end of renderGoRoutine to signal the end of processing.

At the top:

sync := make(chan bool)

In renderGoRoutine at the end (Assuming it is in the same file):

sync <- true

At the bottom:

for f := range sync {
    <- sync
}

Now your program waits until the number of files are processed.

Or to have a complete example:

queue := make(chan string)
sync := make(chan bool)

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
} 

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}

for _, f := range files {
    queue <- f
}
close(queue)

for f := range sync {
    <- sync
}

And the routine should be changed like this:

func renderGoRoutine(queue chan string) {
    for file := range queue { 
        // do some heavy lifting stuff with the file
        sync <- true
    }
}

Upvotes: 1

Related Questions