enneenne
enneenne

Reputation: 219

Check if I can read from channel

package main

import (
    "fmt"
    "strconv"
    "time"
)

func generator() chan int {
    ch := make(chan int)
    go func() {
        i := 0
        for {
            i++
            ch <- i
            time.Sleep(time.Duration(10) * time.Millisecond)
        }
    }()
    return ch
}

func printer(delay int, square bool, ch chan int) {
    for n := range ch {
        if (square) {
            fmt.Printf("["+strconv.Itoa(n) + "],")
        } else {
            fmt.Printf("("+strconv.Itoa(n) + "),")
        }
        time.Sleep(time.Duration(delay) * time.Millisecond)
    }
}

func sendToBoth(ch chan int) (ch1 chan int, ch2 chan int) {
    ch1 = make(chan int)
    ch2 = make(chan int)
    go func() {
        for {
            //n := <- ch
            select {
                case v := <- ch: //this is the problem point
                    ch1  <- v    //
                case v := <- ch: //
                    ch2 <- v     //
            }
        }
    }()
    return ch1, ch2
}

func main() {
    ch1, ch2 := sendToBoth(generator())
    go printer(100, true, ch1) //[]
    go printer(200, false, ch2) //()
    var name string
    fmt.Scan(&name)
}

I want to implement sendToBoth function which gets generated number 1,2,3,... from channel ch and sends it to both ch1 and ch2. But each has a different delay and I don't want one to wait for other to unlock, so I tried to use select, but can't figure out how to ask for If ch1 or ch2 is available at the moment in case clause. Any help? Output should be like

(1),[1],[2],(2),[3],[4],(3),[5],[6],(4),[7],[8],...

Upvotes: 1

Views: 102

Answers (2)

icza
icza

Reputation: 417757

If values from ch1 and ch2 are consumed at a different rate and you want to distribute to both without one having to wait for the other, then the sendToBoth() have to buffer the values until they are received. In your example, this buffer would continuously grow. How do you want to handle that?

One very simple and reasonable way to implement this buffering is to create, use and return buffered channels:

func sendToBoth(ch chan int) (ch1 chan int, ch2 chan int) {
    ch1 = make(chan int, 100)
    ch2 = make(chan int, 100)
    go func() {
        for n := range ch {
            ch1 <- n
            ch2 <- n
        }
    }()
    return ch1, ch2
}

And that's all. Sending on ch1 and ch2 will be non-blocking for as long as their buffer isn't full. Of course if one of ch1's or ch2's buffer is full, further sends on it will have to wait until an element from it is received, but that must be an acceptable compromise, else there's no limit to the growth of the buffered elements. You may use any buffer size you wish.

With this change, output will be as you wish it to be:

[1],(1),[2],(2),[3],[4],(3),[5],[6],(4),[7],[8],(5),[9],[10],(6),[11],[12],(7),[13],[14],(8),[15],[16],(9),[17],[18],

See related question: How to broadcast message using channel

Upvotes: 1

Dietrich Epp
Dietrich Epp

Reputation: 213358

So I’ll say that my first reaction is “just let them run in step, it’s a lot easier.”

func sendToBoth(ch chan int) (ch1, ch2 chan int) {
    ch1 = make(chan int)
    ch2 = make(chan int)
    go func() {
        defer close(ch1)
        defer close(ch2)
        for n := range ch {
            ch1 <- n
            ch2 <- n
        }
    }()
    return ch1, ch2
}

So simple, I love it! But let’s say that you want ch1 and ch2 to get consumed each at their own rate. If you want them to drift apart from each other, you have to use temporary storage to deal with that. The easy way out is to just give channels some buffer space:

ch1 = make(chan int, 10)
ch2 = make(chan int, 10)

Now, ch1 can go faster—but it can only get 10 items ahead of ch2. Or vice versa.

If you want an unlimited size buffer, you have to keep it yourself. We can exploit the fact that a nil channel is perfectly acceptable to use as a select branch:

func sendToBoth(ch chan int) (ch1, ch2 chan int) {
    ch1 = make(chan int)
    ch2 = make(chan int)
    go func() {
        defer close(ch1)
        defer close(ch2)
        var arr []int
        var pos1, pos2 int
        ich := ch
        for inch != nil && (pos1 < len(arr) || pos2 < len(arr)) {
            var och1, och2 chan int
            var v1, v2 int
            if pos1 < len(arr) {
                och1 = ch1
                v1 = arr[pos1]
            }
            if pos2 < len(arr) {
                och2 = ch2
                v2 = arr[pos2]
            }
            select {
            case n, ok := <- ich:
                if !ok {
                    ich = nil // done
                } else {
                    arr = append(arr, n)
                }
            case och1 <- v1:
                pos1++
            case och2 <- v2:
                pos2++
            }
        }
    }()
    return ch1, ch2
}

A fair bit more complicated—I’m not entirely sure this is correct. Also note that there is no effort to release storage for old items in the stream.

Upvotes: 1

Related Questions