Sammi Kerra
Sammi Kerra

Reputation: 25

Go waitgroup with channel (worker)

I'm trying to make simple worker pool in Go. After adding the wait group to the following program I'm facing deadlock. What is the core reason behind it?

When I'm not using the wait group, program seems to be working fine.

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0001b2ea8)

Program -

package main

import (
    "fmt"
    "strconv"
    "sync"
)

func main() {
    workerSize := 2
    ProcessData(workerSize)
}

// ProcessData :
func ProcessData(worker int) {

    // Create Jobs Pool for passong jobs to worker
    JobChan := make(chan string)

    //Produce the jobs
    var jobsArr []string
    for i := 1; i <= 10000; i++ {
        jobsArr = append(jobsArr, "Test "+strconv.Itoa(i))
    }

    //Assign jobs to worker from jobs pool
    var wg sync.WaitGroup
    for w := 1; w <= worker; w++ {
        wg.Add(1)
        // Consumer
        go func(jw int, wg1 *sync.WaitGroup) {
            defer wg1.Done()
            for job := range JobChan {
                actualProcess(job, jw)
            }
        }(w, &wg)
    }

    // Create jobs pool
    for _, job := range jobsArr {
        JobChan <- job
    }

    wg.Wait()
    //close(JobChan)
}

func actualProcess(job string, worker int) {
    fmt.Println("WorkerID: #", worker, ", Job Value: ", job)
}

Upvotes: 0

Views: 1538

Answers (2)

shmsr
shmsr

Reputation: 4204

This is slightly modified but a more advanced version of your implementation. I have commented out the code well so that it's easy to understand. So now you can configure the number of jobs and number of works. And even see how the jobs are distributed among workers so that have averagely almost equal amount of work.

package main

import (
    "fmt"
)

func main() {
    var jobsCount = 10000 // Number of jobs
    var workerCount = 2   // Number of workers
    processData(workerCount, jobsCount)
}

func processData(workers, numJobs int) {
    var jobsArr = make([]string, 0, numJobs)
    // jobArr with nTotal jobs
    for i := 0; i < numJobs; i++ {
        // Fill in jobs
        jobsArr = append(jobsArr, fmt.Sprintf("Test %d", i+1))
    }
    var jobChan = make(chan string, 1)
    defer close(jobChan)
    var (
        // Length of jobsArr
        length = len(jobsArr)
        // Calculate average chunk size
        chunks = len(jobsArr) / workers
        // Window Start Index
        wStart = 0
        // Window End Index
        wEnd = chunks
    )
    // Split the job between workers. Every workers gets a chunk of jobArr
    // to work on. Distribution is work is approximately equal because last
    // worker can less or more work as well.
    for i := 1; i <= workers; i++ {
        // Spawn a goroutine for every worker for chunk i.e., jobArr[wStart:wEnd]
        go func(wrk, s, e int) {
            for j := s; j < e; j++ {
                // Do some actual work. Send the actualProcess's return value to
                // jobChan
                jobChan <- actualProcess(wrk, jobsArr[j])
            }
        }(i, wStart, wEnd)
        // Change pointers to get the set of chunk in next iteration
        wStart = wEnd
        wEnd += chunks
        if i == workers-1 {
            // If next worker is the last worker,
            // do till the end
            wEnd = length
        }
    }
    for i := 0; i < numJobs; i++ {
        // Receieve all jobs
        fmt.Println(<-jobChan)
    }
}

func actualProcess(worker int, job string) string {
    return fmt.Sprintf("WorkerID: #%d, Job Value: %s", worker, job)
}

Upvotes: 1

Marc
Marc

Reputation: 21055

Once all the jobs are consumed, your workers will be waiting in for job := range JobChan for more data. The loop will not finish until the channel is closed.

On the other hand, your main goroutine is waiting for wg.Wait() and does not reach the (commented out) close.

At this point, all goroutines are stuck waiting either for data or for the waitgroup to be done.

The simplest solution is to call close(JobChan) directly after sending all jobs to the channel:

    // Create jobs pool
    for _, job := range jobsArr {
        JobChan <- job
    }

    close(JobChan)
    wg.Wait()
    

Upvotes: 3

Related Questions