Reputation: 171
I find myself in a situation where I have a queue of jobs where workers can add new jobs when they are done processing one.
For illustration, in the code below, a job consists in counting up to JOB_COUNTING_TO
and, randomly, 1/5 of the time a worker will add a new job to the queue.
Because my workers can add jobs to the queue, it is my understanding that I was not able to use a channel as my job queue. This is because sending to the channel is blocking and, even with a buffered channel, this code, due to its recursive nature (jobs adding jobs) could easily reach a situation where all the workers are sending to the channel and no worker is available to receive.
This is why I decided to use a shared queue protected by a mutex.
Now, I would like the program to halt when all the workers are idle. Unfortunately this cannot be spotted just by looking for when len(jobQueue) == 0
as the queue could be empty but some worker still doing their job and maybe adding a new job after that.
The solution I came up with is, I feel a bit clunky, it makes use of variables var idleWorkerCount int
and var isIdle [NB_WORKERS]bool
to keep track of idle workers and the code stops when idleWorkerCount == NB_WORKERS
.
My question is if there is a concurrency pattern that I could use to make this logic more elegant?
Also, for some reason I don't understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000
vs NB_WORKERS = 3000
.
Thank you very much in advance!
package main
import (
"math/rand"
"sync"
)
const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000
var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool
func doJob(workerId int) {
mu.Lock()
if len(jobQueue) == 0 {
if !isIdle[workerId] {
idleWorkerCount += 1
}
isIdle[workerId] = true
mu.Unlock()
return
}
if isIdle[workerId] {
idleWorkerCount -= 1
}
isIdle[workerId] = false
var job int
job, jobQueue = jobQueue[0], jobQueue[1:]
mu.Unlock()
for i := 0; i < job; i += 1 {
}
if rand.Intn(5) == 0 {
mu.Lock()
jobQueue = append(jobQueue, JOB_COUNTING_TO)
mu.Unlock()
}
}
func main() {
// Filling up the queue with initial jobs
for i := 0; i < NB_INITIAL_JOBS; i += 1 {
jobQueue = append(jobQueue, JOB_COUNTING_TO)
}
var wg sync.WaitGroup
for i := 0; i < NB_WORKERS; i += 1 {
wg.Add(1)
go func(workerId int) {
for idleWorkerCount != NB_WORKERS {
doJob(workerId)
}
wg.Done()
}(i)
}
wg.Wait()
}
Upvotes: 0
Views: 622
Reputation:
Because my workers can add jobs to the queue
A re entrant channel always deadlock. This is easy to demonstrate using this code
package main
import (
"fmt"
)
func main() {
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
c <- v + " 2"
out <- v
}
}()
go func() {
c <- "hello world!" // pass OK
c <- "hello world!" // no pass, the routine is blocking at pushing to itself
}()
for v := range out {
fmt.Println(v)
}
}
While the program
c <- v + " 2"
it can not
for v := range c {
,c <- "hello world!"
for v := range out {
thus, it deadlocks.
If you want to pass that situation you must overflow somewhere.
On the routines, or somewhere else.
package main
import (
"fmt"
"time"
)
func main() {
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
go func() { // use routines on the stack as a bank for the required overflow.
<-time.After(time.Second) // simulate slowliness.
c <- v + " 2"
}()
out <- v
}
}()
go func() {
for {
c <- "hello world!"
}
}()
exit := time.After(time.Second * 60)
for v := range out {
fmt.Println(v)
select {
case <-exit:
return
default:
}
}
}
But now you have a new problem.
You created a memory bomb by overflowing without limits on the stack. Technically, this is dependent on the time needed to finish a job, the memory available, the speed of your cpus and the shape of the data (they might or might not generate a new job). So there is a upper limit, but it is so hard to make sense of it, that in practice this ends up to be a bomb.
Consider not overflowing without limits on the stack.
If you dont have any arbitrary limit on hand, you can use a semaphore to cap the overflow.
https://play.golang.org/p/5JWPQiqOYKz
my bombs did not explode with a work timeout of 1s and 2s, but they took a large chunk of memory.
In another round with a modified code, it exploded
Of course, because you use if rand.Intn(5) == 0 {
in your code, the problem is largely mitigated. Though, when you meet such pattern, think twice to the code.
Also, for some reason I don't understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000 vs NB_WORKERS = 3000.
In the big picture, you have a limited amount of cpu cycles. All those allocations and instructions, to spawn and synchronize, has to be executed too. Concurrency is not free.
Now, I would like the program to halt when all the workers are idle.
I came up with that but i find it very difficult to reason about and convince myself it wont end up in a write on closed channel
panic.
The idea is to use a sync.WaitGroup to count in flight items and rely on it to properly close the input channel and finish the job.
package main
import (
"log"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
var wg sync.WaitGroup
var wgr sync.WaitGroup
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
if rand.Intn(5) == 0 {
wgr.Add(1)
go func(v string) {
<-time.After(time.Microsecond)
c <- v + " 2"
}(v)
}
wgr.Done()
out <- v
}
close(out)
}()
var sent int
wg.Add(1)
go func() {
for i := 0; i < 300; i++ {
wgr.Add(1)
c <- "hello world!"
sent++
}
wg.Done()
}()
go func() {
wg.Wait()
wgr.Wait()
close(c)
}()
var rcv int
for v := range out {
// fmt.Println(v)
_ = v
rcv++
}
log.Println("sent", sent)
log.Println("rcv", rcv)
}
I ran it with while go run -race .; do :; done
it worked fine for a reasonable amount of iterations.
Upvotes: 2