Reputation: 128
I am currently working on a small script where I use the channels, select and goroutine and I really don't understand why it doesn't run as I think.
I have 2 channels that all my goroutines listen to.
I pass the channels to each goroutine where there is a select which must choose between the 2 depending on where the data comes first.
The problem is that no goroutine falls into the second case. I can have received 100 jobs one after the other, I see everything in the log. It does well what is requested in the first case and after that it sent the work in the second channel (still if it does well ...) I do not have any more logs. I just don't understand why...
If someone can enlighten me :)
package main
func main() {
wg := new(sync.WaitGroup)
in := make(chan *Job)
out := make(chan *Job)
results := make(chan *Job)
for i := 0; i < 50; i++ {
go work(wg, in, out, results)
}
wg.Wait()
// Finally we collect all the results of the work.
for elem := range results {
fmt.Println(elem)
}
}
func Work(wg *sync.WaitGroup, in chan *Job, out chan *Job, results chan *Job) {
wg.Add(1)
defer wg.Done()
for {
select {
case job := <-in:
ticker := time.Tick(10 * time.Second)
select {
case <-ticker:
// DO stuff
if condition is true {
out <- job
}
case <-time.After(5 * time.Minute):
fmt.Println("Timeout")
}
case job := <-out:
ticker := time.Tick(1 * time.Minute)
select {
case <-ticker:
// DO stuff
if condition is true {
results <- job
}
case <-quitOut:
fmt.Println("Job completed")
}
}
}
}
I create a number of workers who listen to 2 channels and send the final results to the 3rd.
It does something with the received job and if it validates a given condition, it passes this job to the next channel and if it validates a condition it passes the job into the result channel.
So, in my head I had a pipeline like this for 5 workers for example: 3 jobs in the channel IN, directly 3 workers takes them, if the 3 job validates the condition, they are sent in the channel OUT. Directly 2 workers takes them and the 3rd job is picked up by one of the first 3 workers ...
Now I hope you have a better understanding for my first code. But in my code, I never get to the second case.
Upvotes: 0
Views: 4196
Reputation: 3682
I think your solution might be a bit over complicated. Here is a simplified version. Bare in mind that there are numerous implementations. A good article to read
https://medium.com/smsjunk/handling-1-million-requests-per-minute-with-golang-f70ac505fcaa
Or even better right from the Go handbook
https://gobyexample.com/worker-pools (which I think maybe is what you were aiming for)
Anyway, below serves as a different type of example.. There are a few ways to go about solving this problem.
package main
import (
"context"
"log"
"os"
"sync"
"time"
)
type worker struct {
wg *sync.WaitGroup
in chan job
quit context.Context
}
type job struct {
message int
}
func main() {
numberOfJobs := 50
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
w := worker{
wg: &sync.WaitGroup{},
in: make(chan job),
quit: ctx,
}
for i := 0; i < numberOfJobs; i++ {
go func(i int) {
w.in <- job{message: i}
}(i)
}
counter := 0
for {
select {
case j := <-w.in:
counter++
log.Printf("Received job %+v\n", j)
// DO SOMETHING WITH THE RECEIVED JOB
// WORKING ON IT
x := j.message * j.message
log.Printf("job processed, result %d", x)
case <-w.quit.Done():
log.Printf("Recieved quit, timeout reached. Number of jobs queued: %d, Number of jobs complete: %d\n", numberOfJobs, counter)
os.Exit(0)
default:
// TODO
}
}
}
Upvotes: 3
Reputation: 42413
Your quitIn
and quitOut
channels are basically useless: You create them and try to receive from them. Which you cannot as nobody can write to these channels because nobody even knows about their existence. I cannot say more because I do not understand what the code is supposed to do.
Upvotes: 0