Reputation: 219
package main
import (
"fmt"
"strconv"
"time"
)
func generator() chan int {
ch := make(chan int)
go func() {
i := 0
for {
i++
ch <- i
time.Sleep(time.Duration(10) * time.Millisecond)
}
}()
return ch
}
func printer(delay int, square bool, ch chan int) {
for n := range ch {
if (square) {
fmt.Printf("["+strconv.Itoa(n) + "],")
} else {
fmt.Printf("("+strconv.Itoa(n) + "),")
}
time.Sleep(time.Duration(delay) * time.Millisecond)
}
}
func sendToBoth(ch chan int) (ch1 chan int, ch2 chan int) {
ch1 = make(chan int)
ch2 = make(chan int)
go func() {
for {
//n := <- ch
select {
case v := <- ch: //this is the problem point
ch1 <- v //
case v := <- ch: //
ch2 <- v //
}
}
}()
return ch1, ch2
}
func main() {
ch1, ch2 := sendToBoth(generator())
go printer(100, true, ch1) //[]
go printer(200, false, ch2) //()
var name string
fmt.Scan(&name)
}
I want to implement sendToBoth
function which gets generated number 1,2,3,...
from channel ch and sends it to both ch1
and ch2
. But each has a different delay and I don't want one to wait for other to unlock, so I tried to use select
, but can't figure out how to ask for If ch1
or ch2
is available at the moment in case clause. Any help?
Output should be like
(1),[1],[2],(2),[3],[4],(3),[5],[6],(4),[7],[8],...
Upvotes: 1
Views: 102
Reputation: 417757
If values from ch1
and ch2
are consumed at a different rate and you want to distribute to both without one having to wait for the other, then the sendToBoth()
have to buffer the values until they are received. In your example, this buffer would continuously grow. How do you want to handle that?
One very simple and reasonable way to implement this buffering is to create, use and return buffered channels:
func sendToBoth(ch chan int) (ch1 chan int, ch2 chan int) {
ch1 = make(chan int, 100)
ch2 = make(chan int, 100)
go func() {
for n := range ch {
ch1 <- n
ch2 <- n
}
}()
return ch1, ch2
}
And that's all. Sending on ch1
and ch2
will be non-blocking for as long as their buffer isn't full. Of course if one of ch1
's or ch2
's buffer is full, further sends on it will have to wait until an element from it is received, but that must be an acceptable compromise, else there's no limit to the growth of the buffered elements. You may use any buffer size you wish.
With this change, output will be as you wish it to be:
[1],(1),[2],(2),[3],[4],(3),[5],[6],(4),[7],[8],(5),[9],[10],(6),[11],[12],(7),[13],[14],(8),[15],[16],(9),[17],[18],
See related question: How to broadcast message using channel
Upvotes: 1
Reputation: 213358
So I’ll say that my first reaction is “just let them run in step, it’s a lot easier.”
func sendToBoth(ch chan int) (ch1, ch2 chan int) {
ch1 = make(chan int)
ch2 = make(chan int)
go func() {
defer close(ch1)
defer close(ch2)
for n := range ch {
ch1 <- n
ch2 <- n
}
}()
return ch1, ch2
}
So simple, I love it! But let’s say that you want ch1 and ch2 to get consumed each at their own rate. If you want them to drift apart from each other, you have to use temporary storage to deal with that. The easy way out is to just give channels some buffer space:
ch1 = make(chan int, 10)
ch2 = make(chan int, 10)
Now, ch1 can go faster—but it can only get 10 items ahead of ch2. Or vice versa.
If you want an unlimited size buffer, you have to keep it yourself. We can exploit the fact that a nil
channel is perfectly acceptable to use as a select
branch:
func sendToBoth(ch chan int) (ch1, ch2 chan int) {
ch1 = make(chan int)
ch2 = make(chan int)
go func() {
defer close(ch1)
defer close(ch2)
var arr []int
var pos1, pos2 int
ich := ch
for inch != nil && (pos1 < len(arr) || pos2 < len(arr)) {
var och1, och2 chan int
var v1, v2 int
if pos1 < len(arr) {
och1 = ch1
v1 = arr[pos1]
}
if pos2 < len(arr) {
och2 = ch2
v2 = arr[pos2]
}
select {
case n, ok := <- ich:
if !ok {
ich = nil // done
} else {
arr = append(arr, n)
}
case och1 <- v1:
pos1++
case och2 <- v2:
pos2++
}
}
}()
return ch1, ch2
}
A fair bit more complicated—I’m not entirely sure this is correct. Also note that there is no effort to release storage for old items in the stream.
Upvotes: 1