Reputation: 10413
Since Go does not have generics, all the premade solutions use type casting which I do not like very much. I also want to implement it on my own and tried the following code. However, sometimes it does not wait for all goroutines, am I closing the jobs channel prematurely? I do not have anything to fetch from them. I might have used a pseudo output channel too and waited to fetch the exact amount from them however I believe the following code should work too. What am I missing?
func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
Upvotes: 0
Views: 885
Reputation: 2884
You need to pass a pointer to the waitgroup, or else every job receives it's own copy.
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
See the difference here: without pointer, with pointer.
Upvotes: 1
Reputation: 120941
Call wg.Add outside of the goroutine and pass a pointer to the wait group.
If Add is called from inside the goroutine, it's possible for the main goroutine to call Wait before the goroutines get a chance to run. If Add has not been called, then Wait will return immediately.
Pass a pointer to the goroutine. Otherwise, the goroutines use their own copy of the wait group.
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
wg.Add(1)
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
Upvotes: 1