Reputation: 1057
I am trying to implement a simple worker pool in go and keep running into issues. All I want to do is have a set number of workers that do a set amount of work before getting more work to do. The code I am using looks similar to:
jobs := make(chan imageMessage, 1)
results := make(chan imageMessage, 1)
for w := 0; w < 2; w++ {
go worker(jobs, results)
}
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
for r := 0; r < len(images); r++ {
<-results
}
}
func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
for j := range jobs {
processImage(j.path, j.img)
results <- j
}
}
My understanding is that this should create 2 workers that can do 1 "thing" at a time and will continue to get more work as they complete that 1 thing until there is nothing else to do. However, I get fatal error: all goroutines are asleep - deadlock!
If I set the buffer to something huge like 100, this works, but I want to be able to limit the work done at a time.
I feel like I'm close, but obviously missing something.
Upvotes: 3
Views: 2317
Reputation: 1
You can use this worker. Simple and efficient. https://github.com/tamnguyenvt/go-worker
NewWorkerManager(WorkerManagerParams{
WorkerSize: <number of workers>,
RelaxAfter: <sleep for awhile to relax server after given duration>,
RelaxDuration: <relax duration>,
WorkerFunc: <your worker function here>,
LogEnable: <enable log or not>,
StopTimeout: <timeout all workers after given duration>,
}
Upvotes: 0
Reputation: 417572
The problem is that you only start "draining" the results
channel, once you successfully sent all jobs on the jobs
channel. But for you to be able to send all jobs, either the jobs
channel must have big enough buffer, or worker goroutines must be able to consume jobs from it.
But a worker goroutines when consuming a job, before it could take the next one, sends the result on the results
channel. If the buffer of the results
channel is full, sending the result will block.
But the last part – a worker goroutine blocked in sending the result – can only be "unblocked" by receiving from the results
channel – which you don't until you can send all the jobs. Deadlock if the buffer of the jobs
channel and the results
channel cannot hold all your jobs. This also explains why it works if you increase the buffer size to a big value: if the jobs can fit into the buffers, deadlock will not occur, and after all jobs are sent successfully, your final loop will drain the results
channel.
The solution? Run generating-and-sending jobs in its own goroutine, so you can start receiving from the results
channel "immediately" without having to wait to send all jobs, which means the worker goroutines will not get blocked forever trying to send results:
go func() {
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
}()
Try it on the Go Playground.
Also check out a similar implementation in Is this an idiomatic worker thread pool in Go?
Upvotes: 4