Reputation: 5293
I have looked at this, this, this and this but none really help me in this situation. I have multiple goroutines that need to do some task if the value in the channel is for that particular goroutine.
var uuidChan chan string
func handleEntity(entityUuid string) {
go func() {
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
}
case <-time.After(time.Second * 5):
println("Timeout")
return
}
}
}()
}
func main() {
uuidChan = make(chan (string))
for i := 0; i < 5; i++ {
handleEntity(fmt.Sprintf("%d", i))
}
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
}
https://play.golang.org/p/Pu5MhSP9Qtj
In the above logic, uuid is received by one of the channels and nothing happens. To solve this, I tried changing the logic to reinsert the uuid back into the channel if logic for some uuid is not in that routine. I know its a bad practice and that doesn't work too.
func handleEntity(entityUuid string) {
go func() {
var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
for {
select {
case uuid := <-uuidChan:
if uuid == entityUuid {
// logic
println(uuid)
return
} else {
notMe = append(notMe, uuid)
}
case <-time.After(time.Second * 5):
println("Timeout")
defer func() {
for _, uuid := range notMe {
uuidChan <- uuid
}
}()
return
}
}
}()
}
https://play.golang.org/p/5On-Vd7UzqP
What could be the correct way to do this?
Upvotes: 2
Views: 2506
Reputation: 16263
You have a box with a label inside it, so a reciever should read the label first then decide what to do with it. If you place the label inside the box - you are forcing the reiever to open the box (see the solution number 1). I would encourage you do a better postal service and place the label at least outside of the box (see the solution number 3)- or better deleiver the box to a correct address at once (see the solution number 2):
There are many solutions to this and you only limited by your imagination:
1.
Since you have only one channel with a data with an ID inside it for a consumer with an ID, and you can only read a data from the channel once (assuming the oreder of data inside the channel is important) - you have a simple sulotion: Use a reading goroutine which reads a data from the channel then apply the logic to decide what to do with this data - e.g. send it to another goroutine or run a task.
Try this:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
uuidChan := make(chan string)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTimer(5 * time.Second)
defer t.Stop()
for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("Channel closed.")
return
}
// logic:
wg.Add(1)
// Multiple goroutines listening selectively on one channel
go consume(uuid, &wg)
// switch uuid {case 1: go func1(); case 2: go func2()}
case <-t.C:
fmt.Println("Timeout")
return
}
}
}()
for i := 0; i < 4; i++ {
uuidChan <- fmt.Sprintf("%d", i)
}
close(uuidChan) // free up the goroutine
wg.Wait() // wait until all consumers are done
fmt.Println("All done.")
}
// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
defer wg.Done()
// logic: or decide here based on uuid
fmt.Println("job #:", uuid) // job
}
Output:
job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.
package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
// for {
select {
case uuid, ok := <-uuidChan:
if !ok {
fmt.Println("closed")
return // free up goroutine on chan closed
}
fmt.Println(uuid)
return // job done
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
return
}
// }
}
func main() {
const max = 5
slice := make([]chan string, max)
var wg sync.WaitGroup
for i := 0; i < max; i++ {
slice[i] = make(chan string, 1)
wg.Add(1)
go handleEntity(slice[i], &wg)
}
for i := 0; i < 4; i++ {
slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
}
wg.Wait()
fmt.Println("All done.")
}
Output:
3
0
1
2
Timeout
All done.
label
and signal broadcast of sync.Cond
:label
we add the address of the reciever on top of the box.
Here using a shared resource named label
first set the box label
to a desired ID then using signal broadcast inform all listenning goroutines to wake up and check the label
and time to see if one is addressed and expired or not then all go back to wait state and the addressed or expired one continues to read the unbuffered channel or exit. Then using the time.AfterFunc
to signal the expiration of the remaining goroutine(s) and finally wg.Wait()
for them all to join. Note that the first c.Broadcast()
should be called after c.Wait()
- meaning the goroutines should be running before the first call to c.Broadcast()
, so one way is to simply use another sync.WaitGroup
named w4w
short for wait for wait
.package main
import (
"fmt"
"sync"
"time"
)
func handleEntity(entityUuid string) {
defer wg.Done()
t0 := time.Now()
var expired, addressed bool
w4w.Done()
m.Lock()
for !expired && !addressed {
c.Wait()
addressed = label == entityUuid
expired = time.Since(t0) > d
}
m.Unlock()
fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
if !expired && addressed {
uuid := <-uuidChan
fmt.Println("matched =", entityUuid, uuid)
}
fmt.Println("done", entityUuid)
}
func main() {
for i := 0; i < 5; i++ {
w4w.Add(1)
wg.Add(1)
go handleEntity(fmt.Sprintf("%d", i))
}
w4w.Wait()
time.AfterFunc(d, func() {
// m.Lock()
// label = "none"
// m.Unlock()
fmt.Println("expired")
c.Broadcast() // expired
})
for i := 0; i < 4; i++ {
m.Lock()
label = fmt.Sprintf("%d", i)
m.Unlock()
c.Broadcast() // notify all
uuidChan <- label
}
fmt.Println("...")
wg.Wait()
fmt.Println("all done")
}
var (
label string
uuidChan = make(chan string)
m sync.Mutex
c = sync.NewCond(&m)
w4w, wg sync.WaitGroup
d = 1 * time.Second
)
Output:
id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done
Upvotes: 4
Reputation: 2625
maybe you want to map your channels to send the message to correct goroutine right away:
package main
import (
"fmt"
"time"
)
func worker(u string, c chan string) {
for {
fmt.Printf("got %s in %s\n", <-c, u)
}
}
func main() {
workers := make(map[string]chan string)
for _, u := range []string{"foo", "bar", "baz"} {
workers[u] = make(chan string)
go worker(u, workers[u])
}
workers["foo"] <- "hello"
workers["bar"] <- "world"
workers["baz"] <- "!"
fmt.Println()
time.Sleep(time.Second)
}
Upvotes: 4