FryDay
FryDay

Reputation: 1057

Simple worker pool in Go

I am trying to implement a simple worker pool in go and keep running into issues. All I want to do is have a set number of workers that do a set amount of work before getting more work to do. The code I am using looks similar to:

    jobs := make(chan imageMessage, 1)
    results := make(chan imageMessage, 1)

    for w := 0; w < 2; w++ {
        go worker(jobs, results)
    }

    for j := 0; j < len(images); j++ {
        jobs <- imageMessage{path: paths[j], img: images[j]}
    }
    close(jobs)

    for r := 0; r < len(images); r++ {
        <-results
    }
}

func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
    for j := range jobs {
        processImage(j.path, j.img)
        results <- j
    }
}

My understanding is that this should create 2 workers that can do 1 "thing" at a time and will continue to get more work as they complete that 1 thing until there is nothing else to do. However, I get fatal error: all goroutines are asleep - deadlock!

If I set the buffer to something huge like 100, this works, but I want to be able to limit the work done at a time.

I feel like I'm close, but obviously missing something.

Upvotes: 3

Views: 2317

Answers (2)

Tam Nguyen
Tam Nguyen

Reputation: 1

You can use this worker. Simple and efficient. https://github.com/tamnguyenvt/go-worker

NewWorkerManager(WorkerManagerParams{
    WorkerSize: <number of workers>,
    RelaxAfter: <sleep for awhile to relax server after given duration>,
    RelaxDuration: <relax duration>,
    WorkerFunc: <your worker function here>,
    LogEnable: <enable log or not>,
    StopTimeout: <timeout all workers after given duration>,
}

Upvotes: 0

icza
icza

Reputation: 417572

The problem is that you only start "draining" the results channel, once you successfully sent all jobs on the jobs channel. But for you to be able to send all jobs, either the jobs channel must have big enough buffer, or worker goroutines must be able to consume jobs from it.

But a worker goroutines when consuming a job, before it could take the next one, sends the result on the results channel. If the buffer of the results channel is full, sending the result will block.

But the last part – a worker goroutine blocked in sending the result – can only be "unblocked" by receiving from the results channel – which you don't until you can send all the jobs. Deadlock if the buffer of the jobs channel and the results channel cannot hold all your jobs. This also explains why it works if you increase the buffer size to a big value: if the jobs can fit into the buffers, deadlock will not occur, and after all jobs are sent successfully, your final loop will drain the results channel.

The solution? Run generating-and-sending jobs in its own goroutine, so you can start receiving from the results channel "immediately" without having to wait to send all jobs, which means the worker goroutines will not get blocked forever trying to send results:

go func() {
    for j := 0; j < len(images); j++ {
        jobs <- imageMessage{path: paths[j], img: images[j]}
    }
    close(jobs)
}()

Try it on the Go Playground.

Also check out a similar implementation in Is this an idiomatic worker thread pool in Go?

Upvotes: 4

Related Questions