Wira Buana
Wira Buana

Reputation: 3

Nonblocking channel in "Go Concurrency Patterns: Timing out, moving on" blog

I read this blog about how to get data from multiple conns and I tried to give it a swing to understand how it works.

func Query(conns []int) string {
    ch := make(chan string)

    go func() {

        for m := range ch {
        log.Println("message => ",m)
        }   

    }()

    for i, conn := range conns {
        go func(c int,loop int) {
            log.Println("start", loop) 

            select {
            case ch <- get(conn,loop):
                log.Println("got", loop)  
            default:
                log.Println("skipped", loop)
            }

            log.Println("exited", loop)
        }(conn,i)
    }


    log.Println("wait")
    time.Sleep(5 * time.Second)
    
    return "done"
}

func get(i int, loop int) string {
   log.Println("process", loop)
   return fmt.Sprintf("return loop %d", loop)
}

If I comment the default case inside the select block, it will print all messages (expected output).

But if I use a non-blocking channel without commenting out the default case, the channel only prints one message (and others will print "skipped" log message). I can't understand why the incoming messages are coming to default case. Does it have a chance that all message will go to default case? I thought all messages will get printed because get function returns immediately.

Here's a link for Go Playground

Upvotes: 0

Views: 105

Answers (1)

shmsr
shmsr

Reputation: 4204

  1. Run go vet on your code and you'll notice it's throwing an warning loop variable conn captured by func literal as you have used the conn instead of the passed parameter i.e., c. So I have fixed that in my code. But that's orthogonal to your problem.
  2. Let's read about how select works (go-tour-5, go-tour-6):
* A select blocks until one of its cases can run, then it executes that case. 
It chooses one at random if multiple are ready.

* The default case in a select is run if no other case is ready.

So if you use select without it being enclosed by a for loop (eg. select { ... }), it can choose any case at random because multiple cases are ready or it might choose default because other case is not ready. And select blocks until one of its cases (including default) can run and then it exits out. But if you enclose it in a for loop (eg. for { select { ... } }) and modify your implementation then you can run select until it hits the case you want it to hit. And when it does, the goroutine spawned, exits.

So what you're thinking is basically a misconception. Every run gives a different result (sometimes same as well); there's no guarantee because select can choose any case if multiple cases are ready or if the cases are not ready, default case is used.

Give this program a try and understand the parts I've changed:

package main

import (
    "fmt"
    "log"
    "time"
)

func query(conns []int) string {
    ch := make(chan string)

    // receiver
    go func() {
        for m := range ch {
            log.Println("message: ", m)
        }
    }()

    // sender
    for i, conn := range conns {
        go func(c, loop int) {
            log.Println("start: ", loop)
            for {
                select {
                case ch <- get(c, loop):
                    log.Println("got: ", loop)
                    log.Println("exited: ", loop)
                    return
                default:
                    log.Println("skipped: ", loop)
                }
            }
        }(conn, i)
    }

    log.Println("wait")
    time.Sleep(5 * time.Second)

    return "done"
}

func get(i, loop int) string {
    log.Println("process: ", loop)
    return fmt.Sprintf("return loop: %d", loop)
}

func main() {
    res := query([]int{1, 2, 3})
    fmt.Println(res)
}

Go Playground Link

Output from one of the run(s):

2020/08/03 00:50:31 wait
2020/08/03 00:50:31 start:  0
2020/08/03 00:50:31 process:  0
2020/08/03 00:50:31 got:  0
2020/08/03 00:50:31 exited:  0
2020/08/03 00:50:31 start:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 start:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 skipped:  2
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 message:  return loop: 0
2020/08/03 00:50:31 process:  2
2020/08/03 00:50:31 got:  2
2020/08/03 00:50:31 exited:  2
2020/08/03 00:50:31 message:  return loop: 2
2020/08/03 00:50:31 skipped:  1
2020/08/03 00:50:31 process:  1
2020/08/03 00:50:31 got:  1
2020/08/03 00:50:31 exited:  1
2020/08/03 00:50:31 message:  return loop: 1
done

And notice I got all the return loop messages received by the receiver goroutine.


EDIT: We can see that the default case is used several times by the goroutines which suggest that channel must be blocked (case is not ready). It is due to the channel being unbuffered.

So ch := make(chan string) can be modified to make in non-blocking in nature by making it a buffered channel. Here's the link to the improved version on Go Playground.

package main

import (
    "fmt"
    "log"
    "time"
)

func query(conns []int) string {
    // buffered channel with a size of 3
    ch := make(chan string, 3)

    // receiver
    go func() {
        for m := range ch {
            log.Println("message: ", m)
        }
    }()

    // sender
    for i, conn := range conns {
        go func(c, loop int) {
            log.Println("start: ", loop)
            for {
                select {
                case ch <- get(c, loop):
                    log.Println("got: ", loop)
                    log.Println("exited: ", loop)
                    return
                default:
                    log.Println("skipped: ", loop)
                }
            }
        }(conn, i)
    }

    log.Println("wait")
    time.Sleep(5 * time.Second)

    return "done"
}

func get(i, loop int) string {
    log.Println("process: ", loop)
    return fmt.Sprintf("return loop: %d", loop)
}

func main() {
    res := query([]int{1, 2, 3})
    fmt.Println(res)
}

Output from one of the run(s):

2009/11/10 23:00:00 wait
2009/11/10 23:00:00 start:  0
2009/11/10 23:00:00 process:  0
2009/11/10 23:00:00 got:  0
2009/11/10 23:00:00 exited:  0
2009/11/10 23:00:00 start:  1
2009/11/10 23:00:00 process:  1
2009/11/10 23:00:00 got:  1
2009/11/10 23:00:00 exited:  1
2009/11/10 23:00:00 start:  2
2009/11/10 23:00:00 process:  2
2009/11/10 23:00:00 got:  2
2009/11/10 23:00:00 exited:  2
2009/11/10 23:00:00 message:  return loop: 0
2009/11/10 23:00:00 message:  return loop: 1
2009/11/10 23:00:00 message:  return loop: 2
done

Upvotes: 3

Related Questions