Reputation: 25
I'm trying to make simple worker pool in Go. After adding the wait group to the following program I'm facing deadlock. What is the core reason behind it?
When I'm not using the wait group, program seems to be working fine.
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0001b2ea8)
Program -
package main
import (
"fmt"
"strconv"
"sync"
)
func main() {
workerSize := 2
ProcessData(workerSize)
}
// ProcessData :
func ProcessData(worker int) {
// Create Jobs Pool for passong jobs to worker
JobChan := make(chan string)
//Produce the jobs
var jobsArr []string
for i := 1; i <= 10000; i++ {
jobsArr = append(jobsArr, "Test "+strconv.Itoa(i))
}
//Assign jobs to worker from jobs pool
var wg sync.WaitGroup
for w := 1; w <= worker; w++ {
wg.Add(1)
// Consumer
go func(jw int, wg1 *sync.WaitGroup) {
defer wg1.Done()
for job := range JobChan {
actualProcess(job, jw)
}
}(w, &wg)
}
// Create jobs pool
for _, job := range jobsArr {
JobChan <- job
}
wg.Wait()
//close(JobChan)
}
func actualProcess(job string, worker int) {
fmt.Println("WorkerID: #", worker, ", Job Value: ", job)
}
Upvotes: 0
Views: 1538
Reputation: 4204
This is slightly modified but a more advanced version of your implementation. I have commented out the code well so that it's easy to understand. So now you can configure the number of jobs and number of works. And even see how the jobs are distributed among workers so that have averagely almost equal amount of work.
package main
import (
"fmt"
)
func main() {
var jobsCount = 10000 // Number of jobs
var workerCount = 2 // Number of workers
processData(workerCount, jobsCount)
}
func processData(workers, numJobs int) {
var jobsArr = make([]string, 0, numJobs)
// jobArr with nTotal jobs
for i := 0; i < numJobs; i++ {
// Fill in jobs
jobsArr = append(jobsArr, fmt.Sprintf("Test %d", i+1))
}
var jobChan = make(chan string, 1)
defer close(jobChan)
var (
// Length of jobsArr
length = len(jobsArr)
// Calculate average chunk size
chunks = len(jobsArr) / workers
// Window Start Index
wStart = 0
// Window End Index
wEnd = chunks
)
// Split the job between workers. Every workers gets a chunk of jobArr
// to work on. Distribution is work is approximately equal because last
// worker can less or more work as well.
for i := 1; i <= workers; i++ {
// Spawn a goroutine for every worker for chunk i.e., jobArr[wStart:wEnd]
go func(wrk, s, e int) {
for j := s; j < e; j++ {
// Do some actual work. Send the actualProcess's return value to
// jobChan
jobChan <- actualProcess(wrk, jobsArr[j])
}
}(i, wStart, wEnd)
// Change pointers to get the set of chunk in next iteration
wStart = wEnd
wEnd += chunks
if i == workers-1 {
// If next worker is the last worker,
// do till the end
wEnd = length
}
}
for i := 0; i < numJobs; i++ {
// Receieve all jobs
fmt.Println(<-jobChan)
}
}
func actualProcess(worker int, job string) string {
return fmt.Sprintf("WorkerID: #%d, Job Value: %s", worker, job)
}
Upvotes: 1
Reputation: 21055
Once all the jobs are consumed, your workers will be waiting in for job := range JobChan
for more data. The loop will not finish until the channel is closed.
On the other hand, your main goroutine is waiting for wg.Wait()
and does not reach the (commented out) close.
At this point, all goroutines are stuck waiting either for data or for the waitgroup to be done.
The simplest solution is to call close(JobChan)
directly after sending all jobs to the channel:
// Create jobs pool
for _, job := range jobsArr {
JobChan <- job
}
close(JobChan)
wg.Wait()
Upvotes: 3