Danyel
Danyel

Reputation: 2250

Worker pool for a potentially recursive task (i.e., each job can queue other jobs)

I'm writing an application that the user can start with a number of "jobs" (URLs actually). At the beginning (main routine), I add these URLs to a queue, then start x goroutines that work on these URLs.

In special cases, the resource a URL points to may contain even more URLs which have to be added to the queue. The 3 workers are waiting for new jobs to come in and process them. The problem is: once EVERY worker is waiting for a job (and none is producing any), the workers should stop altogether. So either all of them work or no one works.

My current implementation looks something like this and I don't think it's elegant. Unfortunately I couldn't think of a better way that wouldn't include race conditions and I'm not entirely sure if this implementation actually works as intended:

var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}

func work(working chan int) {
  absent := make(chan struct{}, 1)
  // if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
  // This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
  one := false
  for {
    select {
    case u, ok := <-queue.Pop():
      if !ok {
        close(absent)
        return
      }
      if !one {
        // I have started working (delta + 1)
        working <- 1
        absent <- struct{}{}
        one = true
      }
      // do work with u (which may lead to queue.Push(urls...))
    case <-absent: // no jobs at the moment. consume absent => wait
      one = false
      working <- -1
    }
  }
}

func Start() {
  working := make(chan int)
  for i := 0; i < WORKER_COUNT; i++ {
    go work(working)
  }
  // the amount of actually working workers...
  sum := 0
  for {
    delta := <-working
    sum += delta
    if sum == 0 {
      queue.Close() // close channel -> kill workers.
      done <- struct{}{}
      return
    }
  }
}

Is there a better way to tackle this problem?

Upvotes: 2

Views: 796

Answers (1)

twotwotwo
twotwotwo

Reputation: 30027

You can use a sync.WaitGroup (see docs) to control the lifetime of the workers, and use a non-blocking send so workers can't deadlock when they try to queue up more jobs:

package main

import "sync"

const workers = 4

type job struct{}

func (j *job) do(enqueue func(job)) {
    // do the job, calling enqueue() for subtasks as needed
}

func main() {
    jobs, wg := make(chan job), new(sync.WaitGroup)
    var enqueue func(job)

    // workers
    for i := 0; i < workers; i++ {
        go func() {
            for j := range jobs {
                j.do(enqueue)
                wg.Done()
            }
        }()
    }

    // how to queue a job
    enqueue = func(j job) {
        wg.Add(1)
        select {
        case jobs <- j: // another worker took it
        default: // no free worker; do the job now
            j.do(enqueue)
            wg.Done()
        }
    }

    todo := make([]job, 1000)
    for _, j := range todo {
        enqueue(j)
    }
    wg.Wait()
    close(jobs)
}

The difficulty with trying to avoid deadlocks with a buffered channel is that you have to allocate a big enough channel up front to definitely hold all pending tasks without blocking. Problematic unless, say, you have a small and known number of URLs to crawl.

When you fall back to doing ordinary recursion in the current thread, you don't have that static buffer-size limit. Of course, there are still limits: you'd probably run out of RAM if too much work were pending, and theoretically you could exhaust the stack with deep recursion (but that's hard!). So you'd need to track pending tasks some more sophisticated way if you were, say, crawling the Web at large.

Finally, as a more complete example, I'm not super proud of this code, but I happened to write a function to kick off a parallel sort that's recursive in the same way your URL fetching is.

Upvotes: 4

Related Questions