Reputation: 11153
I'm trying to understand how to process a queue in Go if the queue can grow from the processing function itself. See below code.
In this pseudo code, I want to limit the number of handlers I am making to 10. Therefore I create 10 handlers that process the queue. I then start the queue off with a url.
My issue is that according to the docs, the sender
to a channel will block until a receiver receives the data. In the below code, each process is a receiver that handles a new url. However it's quite easy to see that if a process sends 11 links to the queue, it will block until all receivers are finished handling these new links. If those receivers each have even 1 link, then they will also block while sending that new 1 link to the queue. Since everyone is blocked, nothing finishes.
I'm wondering what is the general solution in go, for processing a queue which can grow from the process itself. Note that I think I can do this with a lock on an array called queue
, but I'm trying to understand how it would be done with channels.
var queue = make(chan string)
func process(){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
queue <- link
}
}
}
func main () {
for i :=0; i < 10; i++ {
go process()
}
queue <- "https://stackoverflow.com"
...
// block until receive some quit message
<-quit
}
Upvotes: 0
Views: 1101
Reputation: 2861
One simple method you could use is to move the code that adds the links to the channel into it's own go routine. This way, your main processing can continue while the blocked channel write is instead blocking a separate go routine.
func process(){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started
go func(l) {
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}
Edit with semaphore example to limit go routines:
func main () {
maxWorkers := 5000
sem := semaphore.NewWeighted(int64(maxWorkers))
ctx := context.TODO()
for i :=0; i < 10; i++ {
go process(ctx)
}
queue <- "https://stackoverflow.com"
// block until receive some quit message
<-quit
}
func process(ctx context.Context){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started
// acquire a go routine...
// if we are at the routine limit, this line will block until one becomes available
sem.Acquire(ctx, 1)
go func(l) {
defer sem.Release(1)
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}
This option could end up causing deadlocks though... Assuming all go routines have been claimed, the parent loops could get locked on sem.Acquire
. That would then cause the child routines to never add to the channel, and therefor never execute the deferred sem.Release
. Off the top of my head I'm struggling to come up with a nice way to deal with this. Perhaps an external in-memory queue rather than channels?
Upvotes: 2
Reputation: 12675
There are two things you can do either use buffered channels not to block even if there is no one to receive on the other end. That way you can flush the values inside the channel at once.
One more efficient way is to check if there any value available in the channel or the channel is closed which should be by the sender when all values are sent.
Receivers can test whether a channel has been closed by assigning a second parameter to the receive expression.
v, ok := <-ch
ok
is false
if there are no more values to receive and the channel is closed. Check for the value inside channel using select as
package main
import (
"fmt"
"sync"
)
var queue = make(chan int)
var wg sync.WaitGroup
func process(){
values := []int{1,2,5,3,9,7}
for _, value := range values {
queue <- value
}
}
func main () {
for i :=0; i < 10; i++ {
go process()
}
wg.Add(1)
go func(){
defer wg.Done()
for j:=0;j<30;j++ {
select {
case <-queue:
fmt.Println(<-queue)
}
}
}()
wg.Wait()
close(queue)
}
Upvotes: 0