Cosmo Sterin
Cosmo Sterin

Reputation: 171

Job queue where workers can add jobs, is there an elegant solution to stop the program when all workers are idle?

I find myself in a situation where I have a queue of jobs where workers can add new jobs when they are done processing one.

For illustration, in the code below, a job consists in counting up to JOB_COUNTING_TO and, randomly, 1/5 of the time a worker will add a new job to the queue.

Because my workers can add jobs to the queue, it is my understanding that I was not able to use a channel as my job queue. This is because sending to the channel is blocking and, even with a buffered channel, this code, due to its recursive nature (jobs adding jobs) could easily reach a situation where all the workers are sending to the channel and no worker is available to receive.

This is why I decided to use a shared queue protected by a mutex.

Now, I would like the program to halt when all the workers are idle. Unfortunately this cannot be spotted just by looking for when len(jobQueue) == 0 as the queue could be empty but some worker still doing their job and maybe adding a new job after that.

The solution I came up with is, I feel a bit clunky, it makes use of variables var idleWorkerCount int and var isIdle [NB_WORKERS]bool to keep track of idle workers and the code stops when idleWorkerCount == NB_WORKERS.

My question is if there is a concurrency pattern that I could use to make this logic more elegant?

Also, for some reason I don't understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000 vs NB_WORKERS = 3000.

Thank you very much in advance!

package main

import (
    "math/rand"
    "sync"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool

func doJob(workerId int) {

    mu.Lock()

    if len(jobQueue) == 0 {
        if !isIdle[workerId] {
            idleWorkerCount += 1
        }
        isIdle[workerId] = true
        mu.Unlock()
        return
    }

    if isIdle[workerId] {
        idleWorkerCount -= 1
    }
    isIdle[workerId] = false

    var job int
    job, jobQueue = jobQueue[0], jobQueue[1:]
    mu.Unlock()

    for i := 0; i < job; i += 1 {
    }

    if rand.Intn(5) == 0 {
        mu.Lock()
        jobQueue = append(jobQueue, JOB_COUNTING_TO)
        mu.Unlock()
    }

}

func main() {

    // Filling up the queue with initial jobs
    for i := 0; i < NB_INITIAL_JOBS; i += 1 {
        jobQueue = append(jobQueue, JOB_COUNTING_TO)
    }

    var wg sync.WaitGroup
    for i := 0; i < NB_WORKERS; i += 1 {
        wg.Add(1)
        go func(workerId int) {
            for idleWorkerCount != NB_WORKERS {
                doJob(workerId)
            }
            wg.Done()
        }(i)
    }
    wg.Wait()
}

Upvotes: 0

Views: 622

Answers (1)

user4466350
user4466350

Reputation:

Because my workers can add jobs to the queue

A re entrant channel always deadlock. This is easy to demonstrate using this code

package main

import (
    "fmt"
)

func main() {

    out := make(chan string)
    c := make(chan string)
    go func() {
        for v := range c {
            c <- v + " 2"
            out <- v
        }
    }()

    go func() {
        c <- "hello world!" // pass OK
        c <- "hello world!" // no pass, the routine is blocking at pushing to itself
    }()

    for v := range out {
        fmt.Println(v)
    }

}

While the program

  • tries to push at c <- v + " 2"

it can not

  • read at for v := range c {,
  • push at c <- "hello world!"
  • read at for v := range out {

thus, it deadlocks.

If you want to pass that situation you must overflow somewhere.

On the routines, or somewhere else.

package main

import (
    "fmt"
    "time"
)

func main() {

    out := make(chan string)
    c := make(chan string)
    go func() {
        for v := range c {
            go func() { // use routines on the stack as a bank for the required overflow.
                <-time.After(time.Second) // simulate slowliness.
                c <- v + " 2"
            }()
            out <- v
        }
    }()

    go func() {
        for {
            c <- "hello world!"
        }
    }()

    exit := time.After(time.Second * 60)
    for v := range out {
        fmt.Println(v)
        select {
        case <-exit:
            return
        default:
        }
    }
}

But now you have a new problem.

You created a memory bomb by overflowing without limits on the stack. Technically, this is dependent on the time needed to finish a job, the memory available, the speed of your cpus and the shape of the data (they might or might not generate a new job). So there is a upper limit, but it is so hard to make sense of it, that in practice this ends up to be a bomb.

Consider not overflowing without limits on the stack.

If you dont have any arbitrary limit on hand, you can use a semaphore to cap the overflow.

https://play.golang.org/p/5JWPQiqOYKz

my bombs did not explode with a work timeout of 1s and 2s, but they took a large chunk of memory.

my bombs did not explode with a work timeout of 1s and 2s

In another round with a modified code, it exploded

enter image description here

Of course, because you use if rand.Intn(5) == 0 { in your code, the problem is largely mitigated. Though, when you meet such pattern, think twice to the code.

Also, for some reason I don't understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000 vs NB_WORKERS = 3000.

In the big picture, you have a limited amount of cpu cycles. All those allocations and instructions, to spawn and synchronize, has to be executed too. Concurrency is not free.

Now, I would like the program to halt when all the workers are idle.

I came up with that but i find it very difficult to reason about and convince myself it wont end up in a write on closed channel panic.

The idea is to use a sync.WaitGroup to count in flight items and rely on it to properly close the input channel and finish the job.

package main

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

func main() {

    rand.Seed(time.Now().UnixNano())

    var wg sync.WaitGroup
    var wgr sync.WaitGroup
    out := make(chan string)
    c := make(chan string)
    go func() {
        for v := range c {
            if rand.Intn(5) == 0 {
                wgr.Add(1)
                go func(v string) {
                    <-time.After(time.Microsecond)
                    c <- v + " 2"
                }(v)
            }
            wgr.Done()
            out <- v
        }
        close(out)
    }()

    var sent int
    wg.Add(1)
    go func() {
        for i := 0; i < 300; i++ {
            wgr.Add(1)
            c <- "hello world!"
            sent++
        }
        wg.Done()
    }()

    go func() {
        wg.Wait()
        wgr.Wait()
        close(c)
    }()

    var rcv int
    for v := range out {
        // fmt.Println(v)
        _ = v
        rcv++
    }
    log.Println("sent", sent)
    log.Println("rcv", rcv)
}

I ran it with while go run -race .; do :; done it worked fine for a reasonable amount of iterations.

Upvotes: 2

Related Questions