Reputation: 2250
I'm writing an application that the user can start with a number of "jobs" (URLs actually). At the beginning (main routine), I add these URLs to a queue, then start x goroutines that work on these URLs.
In special cases, the resource a URL points to may contain even more URLs which have to be added to the queue. The 3 workers are waiting for new jobs to come in and process them. The problem is: once EVERY worker is waiting for a job (and none is producing any), the workers should stop altogether. So either all of them work or no one works.
My current implementation looks something like this and I don't think it's elegant. Unfortunately I couldn't think of a better way that wouldn't include race conditions and I'm not entirely sure if this implementation actually works as intended:
var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}
func work(working chan int) {
absent := make(chan struct{}, 1)
// if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
// This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
one := false
for {
select {
case u, ok := <-queue.Pop():
if !ok {
close(absent)
return
}
if !one {
// I have started working (delta + 1)
working <- 1
absent <- struct{}{}
one = true
}
// do work with u (which may lead to queue.Push(urls...))
case <-absent: // no jobs at the moment. consume absent => wait
one = false
working <- -1
}
}
}
func Start() {
working := make(chan int)
for i := 0; i < WORKER_COUNT; i++ {
go work(working)
}
// the amount of actually working workers...
sum := 0
for {
delta := <-working
sum += delta
if sum == 0 {
queue.Close() // close channel -> kill workers.
done <- struct{}{}
return
}
}
}
Is there a better way to tackle this problem?
Upvotes: 2
Views: 796
Reputation: 30027
You can use a sync.WaitGroup (see docs) to control the lifetime of the workers, and use a non-blocking send so workers can't deadlock when they try to queue up more jobs:
package main
import "sync"
const workers = 4
type job struct{}
func (j *job) do(enqueue func(job)) {
// do the job, calling enqueue() for subtasks as needed
}
func main() {
jobs, wg := make(chan job), new(sync.WaitGroup)
var enqueue func(job)
// workers
for i := 0; i < workers; i++ {
go func() {
for j := range jobs {
j.do(enqueue)
wg.Done()
}
}()
}
// how to queue a job
enqueue = func(j job) {
wg.Add(1)
select {
case jobs <- j: // another worker took it
default: // no free worker; do the job now
j.do(enqueue)
wg.Done()
}
}
todo := make([]job, 1000)
for _, j := range todo {
enqueue(j)
}
wg.Wait()
close(jobs)
}
The difficulty with trying to avoid deadlocks with a buffered channel is that you have to allocate a big enough channel up front to definitely hold all pending tasks without blocking. Problematic unless, say, you have a small and known number of URLs to crawl.
When you fall back to doing ordinary recursion in the current thread, you don't have that static buffer-size limit. Of course, there are still limits: you'd probably run out of RAM if too much work were pending, and theoretically you could exhaust the stack with deep recursion (but that's hard!). So you'd need to track pending tasks some more sophisticated way if you were, say, crawling the Web at large.
Finally, as a more complete example, I'm not super proud of this code, but I happened to write a function to kick off a parallel sort that's recursive in the same way your URL fetching is.
Upvotes: 4