subtleseeker
subtleseeker

Reputation: 5293

Multiple goroutines listening selectively on one channel

I have looked at this, this, this and this but none really help me in this situation. I have multiple goroutines that need to do some task if the value in the channel is for that particular goroutine.

var uuidChan chan string

func handleEntity(entityUuid string) {
    go func() {
        for {
            select {
            case uuid := <-uuidChan:
                if uuid == entityUuid {
                    // logic
                    println(uuid)
                    return
                }
            case <-time.After(time.Second * 5):
                println("Timeout")
                return
            }
        }
    }()
}

func main() {
    uuidChan = make(chan (string))
    for i := 0; i < 5; i++ {
        handleEntity(fmt.Sprintf("%d", i))
    }
    for i := 0; i < 4; i++ {
        uuidChan <- fmt.Sprintf("%d", i)
    }
}

https://play.golang.org/p/Pu5MhSP9Qtj

In the above logic, uuid is received by one of the channels and nothing happens. To solve this, I tried changing the logic to reinsert the uuid back into the channel if logic for some uuid is not in that routine. I know its a bad practice and that doesn't work too.

func handleEntity(entityUuid string) {
    go func() {
        var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.
        for {
            select {
            case uuid := <-uuidChan:
                if uuid == entityUuid {
                    // logic
                    println(uuid)
                    return
                } else {
                    notMe = append(notMe, uuid)
                }
            case <-time.After(time.Second * 5):
                println("Timeout")
                defer func() {
                    for _, uuid := range notMe {
                        uuidChan <- uuid
                    }
                }()
                return
            }
        }
    }()
}

https://play.golang.org/p/5On-Vd7UzqP

What could be the correct way to do this?

Upvotes: 2

Views: 2506

Answers (2)

wasmup
wasmup

Reputation: 16263

You have a box with a label inside it, so a reciever should read the label first then decide what to do with it. If you place the label inside the box - you are forcing the reiever to open the box (see the solution number 1). I would encourage you do a better postal service and place the label at least outside of the box (see the solution number 3)- or better deleiver the box to a correct address at once (see the solution number 2):

There are many solutions to this and you only limited by your imagination:
1. Since you have only one channel with a data with an ID inside it for a consumer with an ID, and you can only read a data from the channel once (assuming the oreder of data inside the channel is important) - you have a simple sulotion: Use a reading goroutine which reads a data from the channel then apply the logic to decide what to do with this data - e.g. send it to another goroutine or run a task.
Try this:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    uuidChan := make(chan string)
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        defer wg.Done()
        t := time.NewTimer(5 * time.Second)
        defer t.Stop()
        for {
            select {
            case uuid, ok := <-uuidChan:
                if !ok {
                    fmt.Println("Channel closed.")
                    return
                }
// logic:
                wg.Add(1)
                // Multiple goroutines listening selectively on one channel
                go consume(uuid, &wg)
                // switch uuid {case 1: go func1(); case 2: go func2()}

            case <-t.C:
                fmt.Println("Timeout")
                return
            }
        }
    }()

    for i := 0; i < 4; i++ {
        uuidChan <- fmt.Sprintf("%d", i)
    }
    close(uuidChan) // free up the goroutine

    wg.Wait() // wait until all consumers are done
    fmt.Println("All done.")
}

// Multiple goroutines listening selectively on one channel
func consume(uuid string, wg *sync.WaitGroup) {
    defer wg.Done()
// logic: or decide here based on uuid
    fmt.Println("job #:", uuid) // job
}

Output:

job #: 0
job #: 2
job #: 1
Channel closed.
job #: 3
All done.

  1. Using a channel per goroutine, try this:
package main

import (
    "fmt"
    "sync"
    "time"
)

func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    // for {
    select {
    case uuid, ok := <-uuidChan:
        if !ok {
            fmt.Println("closed")
            return // free up goroutine on chan closed
        }
        fmt.Println(uuid)
        return // job done

    case <-time.After(1 * time.Second):
        fmt.Println("Timeout")
        return
    }
    // }
}

func main() {
    const max = 5
    slice := make([]chan string, max)
    var wg sync.WaitGroup

    for i := 0; i < max; i++ {
        slice[i] = make(chan string, 1)

        wg.Add(1)
        go handleEntity(slice[i], &wg)
    }

    for i := 0; i < 4; i++ {
        slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel
    }

    wg.Wait()
    fmt.Println("All done.")
}

Output:

3
0
1
2
Timeout
All done.

  1. Using label and signal broadcast of sync.Cond:
    So we have a box and using shared var named label we add the address of the reciever on top of the box. Here using a shared resource named label first set the box label to a desired ID then using signal broadcast inform all listenning goroutines to wake up and check the label and time to see if one is addressed and expired or not then all go back to wait state and the addressed or expired one continues to read the unbuffered channel or exit. Then using the time.AfterFunc to signal the expiration of the remaining goroutine(s) and finally wg.Wait() for them all to join. Note that the first c.Broadcast() should be called after c.Wait() - meaning the goroutines should be running before the first call to c.Broadcast(), so one way is to simply use another sync.WaitGroup named w4w short for wait for wait.
package main

import (
    "fmt"
    "sync"
    "time"
)

func handleEntity(entityUuid string) {
    defer wg.Done()
    t0 := time.Now()
    var expired, addressed bool

    w4w.Done()
    m.Lock()
    for !expired && !addressed {
        c.Wait()
        addressed = label == entityUuid
        expired = time.Since(t0) > d
    }
    m.Unlock()

    fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)
    if !expired && addressed {
        uuid := <-uuidChan
        fmt.Println("matched =", entityUuid, uuid)
    }
    fmt.Println("done", entityUuid)
}

func main() {
    for i := 0; i < 5; i++ {
        w4w.Add(1)
        wg.Add(1)
        go handleEntity(fmt.Sprintf("%d", i))
    }
    w4w.Wait()

    time.AfterFunc(d, func() {
        // m.Lock()
        // label = "none"
        // m.Unlock()
        fmt.Println("expired")
        c.Broadcast() // expired
    })

    for i := 0; i < 4; i++ {
        m.Lock()
        label = fmt.Sprintf("%d", i)
        m.Unlock()
        c.Broadcast() // notify all
        uuidChan <- label
    }

    fmt.Println("...")
    wg.Wait()
    fmt.Println("all done")
}

var (
    label    string
    uuidChan = make(chan string)
    m        sync.Mutex
    c        = sync.NewCond(&m)
    w4w, wg  sync.WaitGroup
    d        = 1 * time.Second
)

Output:

id = 0 addressed = true expired = false
matched = 0 0
done 0
id = 1 addressed = true expired = false
matched = 1 1
done 1
id = 2 addressed = true expired = false
matched = 2 2
done 2
id = 3 addressed = true expired = false
matched = 3 3
done 3
...
expired
id = 4 addressed = false expired = true
done 4
all done

Upvotes: 4

Jakub D&#243;ka
Jakub D&#243;ka

Reputation: 2625

maybe you want to map your channels to send the message to correct goroutine right away:

package main

import (
    "fmt"
    "time"
)

func worker(u string, c chan string) {
    for {
        fmt.Printf("got %s in %s\n", <-c, u)
    }
}

func main() {
    workers := make(map[string]chan string)

    for _, u := range []string{"foo", "bar", "baz"} {
        workers[u] = make(chan string)
        go worker(u, workers[u])
    }

    workers["foo"] <- "hello"
    workers["bar"] <- "world"
    workers["baz"] <- "!"

    fmt.Println()

    time.Sleep(time.Second)
}

Upvotes: 4

Related Questions