Mustafa
Mustafa

Reputation: 10413

Implementing a job worker pool in Go

Since Go does not have generics, all the premade solutions use type casting which I do not like very much. I also want to implement it on my own and tried the following code. However, sometimes it does not wait for all goroutines, am I closing the jobs channel prematurely? I do not have anything to fetch from them. I might have used a pseudo output channel too and waited to fetch the exact amount from them however I believe the following code should work too. What am I missing?

func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        go jobWorker(w, jobs, wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

Upvotes: 0

Views: 885

Answers (2)

fl0cke
fl0cke

Reputation: 2884

You need to pass a pointer to the waitgroup, or else every job receives it's own copy.

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        go jobWorker(w, jobs, &wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

See the difference here: without pointer, with pointer.

Upvotes: 1

Thundercat
Thundercat

Reputation: 120941

Call wg.Add outside of the goroutine and pass a pointer to the wait group.

If Add is called from inside the goroutine, it's possible for the main goroutine to call Wait before the goroutines get a chance to run. If Add has not been called, then Wait will return immediately.

Pass a pointer to the goroutine. Otherwise, the goroutines use their own copy of the wait group.

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {

    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        wg.Add(1)
        go jobWorker(w, jobs, &wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

Upvotes: 1

Related Questions