bluebeel
bluebeel

Reputation: 128

Golang channel in select not receiving

I am currently working on a small script where I use the channels, select and goroutine and I really don't understand why it doesn't run as I think.

I have 2 channels that all my goroutines listen to.

I pass the channels to each goroutine where there is a select which must choose between the 2 depending on where the data comes first.

The problem is that no goroutine falls into the second case. I can have received 100 jobs one after the other, I see everything in the log. It does well what is requested in the first case and after that it sent the work in the second channel (still if it does well ...) I do not have any more logs. I just don't understand why...

If someone can enlighten me :)

package main

func main() {

    wg := new(sync.WaitGroup)
    in := make(chan *Job)
    out := make(chan *Job)
    results := make(chan *Job)

    for i := 0; i < 50; i++ {
        go work(wg, in, out, results)
    }

    wg.Wait()

    // Finally we collect all the results of the work.
    for elem := range results {
            fmt.Println(elem)
    }    
}

func Work(wg *sync.WaitGroup, in chan *Job, out chan *Job, results chan *Job) {
    wg.Add(1)
    defer wg.Done()
    for {
        select {
        case job := <-in:
            ticker := time.Tick(10 * time.Second)

            select {
            case <-ticker:
                // DO stuff
            if condition is true {
                out <- job
            }
            case <-time.After(5 * time.Minute):
                fmt.Println("Timeout")
            }
        case job := <-out:
            ticker := time.Tick(1 * time.Minute)

            select {
            case <-ticker:
                // DO stuff
            if condition is true {
                results <- job
            }

            case <-quitOut:
                fmt.Println("Job completed")
            }
        }
    }
}

I create a number of workers who listen to 2 channels and send the final results to the 3rd.

It does something with the received job and if it validates a given condition, it passes this job to the next channel and if it validates a condition it passes the job into the result channel.

So, in my head I had a pipeline like this for 5 workers for example: 3 jobs in the channel IN, directly 3 workers takes them, if the 3 job validates the condition, they are sent in the channel OUT. Directly 2 workers takes them and the 3rd job is picked up by one of the first 3 workers ...

Now I hope you have a better understanding for my first code. But in my code, I never get to the second case.

Upvotes: 0

Views: 4196

Answers (3)

ND003
ND003

Reputation: 802

Because your function is "Work" and you are calling "work".

Upvotes: -1

reticentroot
reticentroot

Reputation: 3682

I think your solution might be a bit over complicated. Here is a simplified version. Bare in mind that there are numerous implementations. A good article to read

https://medium.com/smsjunk/handling-1-million-requests-per-minute-with-golang-f70ac505fcaa

Or even better right from the Go handbook

https://gobyexample.com/worker-pools (which I think maybe is what you were aiming for)

Anyway, below serves as a different type of example.. There are a few ways to go about solving this problem.

package main

import (
    "context"
    "log"
    "os"
    "sync"
    "time"
)

type worker struct {
    wg   *sync.WaitGroup
    in   chan job
    quit context.Context
}

type job struct {
    message int
}

func main() {
    numberOfJobs := 50

    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    w := worker{
        wg:   &sync.WaitGroup{},
        in:   make(chan job),
        quit: ctx,
    }

    for i := 0; i < numberOfJobs; i++ {
        go func(i int) {
            w.in <- job{message: i}
        }(i)
    }

    counter := 0
    for {
        select {
        case j := <-w.in:
            counter++
            log.Printf("Received job %+v\n", j)
            // DO SOMETHING WITH THE RECEIVED JOB
            // WORKING ON IT
            x := j.message * j.message
            log.Printf("job processed, result %d", x)
        case <-w.quit.Done():
            log.Printf("Recieved quit, timeout reached.  Number of jobs queued: %d, Number of jobs complete: %d\n", numberOfJobs, counter)
            os.Exit(0)
        default:
            // TODO
        }
    }

}

Upvotes: 3

Volker
Volker

Reputation: 42413

Your quitIn and quitOut channels are basically useless: You create them and try to receive from them. Which you cannot as nobody can write to these channels because nobody even knows about their existence. I cannot say more because I do not understand what the code is supposed to do.

Upvotes: 0

Related Questions