Reputation: 61
I have situation where in, the main go routines will create "x" go routines. but it is interested only in "y" ( y < x ) go routines to finish.
I was hoping to use Waitgroup. But Waitgroup only allows me to wait on all go routines. I cannot, for example do this,
1. wg.Add (y)
2 create "x" go routines. These routines will call wg.Done() when finished.
3. wg. Wait()
This panics when the y+1 go routine calls wg.Done() because the wg counter goes negative.
I sure can use channels to solve this but I am interested if Waitgroup solves this.
Upvotes: 3
Views: 1953
Reputation: 1
You can have two wait groups one for y routines and another for (x-y) routines. For example :
package main
import (
"fmt"
"sync"
)
//Implement fanin fanout pattern
//Scrape url for multiple urls in a list
//Code for 10 urls and 3 workers
func fanOut(results chan string, numOfWorkers int, urls []string, pwg *sync.WaitGroup) {
urlChannel := make(chan string, len(urls))
addUrlToChannel(urls, urlChannel)
for i := 0; i < numOfWorkers; i++ {
pwg.Add(1)
go processWorker(pwg, urlChannel, results)
}
pwg.Wait()
close(results)
}
func addUrlToChannel(urls []string, urlChannel chan string) {
for _, url := range urls {
urlChannel <- url
}
close(urlChannel)
}
func processWorker(pwg *sync.WaitGroup, urlChannel chan string, results chan string) {
for url := range urlChannel {
scrapeUrl(url, results)
}
pwg.Done()
}
func scrapeUrl(url string, results chan<- string) {
results <- fmt.Sprintf("Successfully scraped %s: ", url)
}
func fanIn(scrapedUrls chan string, cwg *sync.WaitGroup) {
defer cwg.Done()
for url := range scrapedUrls {
fmt.Println("Scraped url", url)
}
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.google.com",
"https://www.github.com",
}
results := make(chan string)
var pwg sync.WaitGroup
var cwg sync.WaitGroup
numOfWorkers := 3
//FanIn
cwg.Add(1)
go fanIn(results, &cwg)
//FanOut
fanOut(results, numOfWorkers, urls, &pwg)
cwg.Wait()
fmt.Println("Application ended")
}
Upvotes: 0
Reputation: 11
I would recommend using sync.Cond
for this use case where the main thread waits for the count of goroutine to be completed after each background task signal.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var (
minWorkerToBeCompleted = 5
)
type worker struct {
sig *sync.Cond
completedWorkerCount int
}
func (w *worker) backgroundWork(id int) {
waitTime := rand.Intn(10)
time.Sleep(time.Duration(waitTime) * time.Second)
w.sig.L.Lock()
fmt.Printf("Background:: worker id(%d) completed \n", id)
w.completedWorkerCount++
w.sig.Broadcast() // we can also use sig.Signal
w.sig.L.Unlock()
}
func (w *worker) runBackgroundWorkers(max int) {
for i := 1; i <= max; i++ {
go w.backgroundWork(i)
}
}
func main() {
w := &worker{
sig: sync.NewCond(&sync.Mutex{}),
}
fmt.Println("hello ")
//Print([]byte("abcd"), 0)
w.runBackgroundWorkers(10)
w.sig.L.Lock()
for w.completedWorkerCount < minWorkerToBeCompleted {
w.sig.Wait()
fmt.Printf("mainGoroutine:: We have completed %d workers so far\n", w.completedWorkerCount)
}
w.sig.L.Unlock()
fmt.Printf("mainGoroutine:: completed, with %d workers completed out of 10\n", w.completedWorkerCount)
}
a working example https://go.dev/play/p/7ucK0d7hWNp
Upvotes: 0
Reputation: 1155
Are these y specific go-routines that you are trying to track, or any y out of the x? What are the criteria?
Update:
1. If you hve control over any criteria to pick matching y
go-routines:
You can do wp.wg.Add(1)
and wp.wg.Done()
from inside the goroutine based on your condition by passing it as a pointer argument into the goroutine, if your condition can't be checked outside the goroutine.
Something like below sample code. Will be able to be more specific if you provide more details of what you are trying to do.
func sampleGoroutine(z int, b string, wg *sync.WaitGroup){
defer func(){
if contition1{
wg.Done()
}
}
if contition1 {
wg.Add(1)
//do stuff
}
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < x; i++ {
go sampleGoroutine(1, "one", &wg)
}
wg.Wait()
}
2. If you have no control over which ones, and just want the first y
:
Based on your comment, that you have no control/desire to pick any specific goroutines, but the ones that finish first. If you would want to do it in a generic way, you can use the below custom waitGroup implementation that fits your use case. (It's not copy-safe, though. Also doesn't have/need wg.Add(int) method)
type CountedWait struct {
wait chan struct{}
limit int
}
func NewCountedWait(limit int) *CountedWait {
return &CountedWait{
wait: make(chan struct{}, limit),
limit: limit,
}
}
func (cwg *CountedWait) Done() {
cwg.wait <- struct{}{}
}
func (cwg *CountedWait) Wait() {
count := 0
for count < cwg.limit {
<-cwg.wait
count += 1
}
}
Which can be used as follows:
func sampleGoroutine(z int, b string, wg *CountedWait) {
success := false
defer func() {
if success == true {
fmt.Printf("goroutine %d finished successfully\n", z)
wg.Done()
}
}()
fmt.Printf("goroutine %d started\n", z)
time.Sleep(time.Second)
if rand.Intn(10)%2 == 0 {
success = true
}
}
func main() {
x := 10
y := 3
wg := NewCountedWait(y)
for i := 0; i < x; i += 1 {
// Wrap our work function with the local signalling logic
go sampleGoroutine(i, "something", wg)
}
wg.Wait()
fmt.Printf("%d out of %d goroutines finished successfully.\n", y, x)
}
3. You can also club in context
with 2 to ensure that the remaining goroutines don't leak
You may not be able to run this on play.golang, as it has some long sleeps.
Below is a sample output: (note that, there may be more than y=3 goroutines marking Done, but you are only waiting till 3 finish)
goroutine 9 started
goroutine 0 started
goroutine 1 started
goroutine 2 started
goroutine 3 started
goroutine 4 started
goroutine 5 started
goroutine 5 marking done
goroutine 6 started
goroutine 7 started
goroutine 7 marking done
goroutine 8 started
goroutine 3 marking done
continuing after 3 out of 10 goroutines finished successfully.
goroutine 9 will be killed, bcz cancel
goroutine 8 will be killed, bcz cancel
goroutine 6 will be killed, bcz cancel
goroutine 1 will be killed, bcz cancel
goroutine 0 will be killed, bcz cancel
goroutine 4 will be killed, bcz cancel
goroutine 2 will be killed, bcz cancel
Play links
Upvotes: 1
Reputation: 5379
As noted in Adrian's answer, sync.WaitGroup
is a simple counter whose Wait
method will block until the counter value reaches zero. It is intended to allow you to block (or join) on a number of goroutines before allowing a main flow of execution to proceed.
The interface of WaitGroup
is not sufficiently expressive for your usecase, nor is it designed to be. In particular, you cannot use it naïvely by simply calling wg.Add(y)
(where y < x). The call to wg.Done
by the (y+1)th goroutine will cause a panic, as it is an error for a wait group to have a negative internal value. Furthermore, we cannot be "smart" by observing the internal counter value of the WaitGroup
; this would break an abstraction and, in any event, its internal state is not exported.
You can implement the relevant logic yourself using some channels per the code below (playground link). Observe from the console that 10 goroutines are started, but after two have completed, we fallthrough to continue execution in the main method.
package main
import (
"fmt"
"time"
)
// Set goroutine counts here
const (
// The number of goroutines to spawn
x = 10
// The number of goroutines to wait for completion
// (y <= x) must hold.
y = 2
)
func doSomeWork() {
// do something meaningful
time.Sleep(time.Second)
}
func main() {
// Accumulator channel, used by each goroutine to signal completion.
// It is buffered to ensure the [y+1, ..., x) goroutines do not block
// when sending to the channel, which would cause a leak. It will be
// garbage collected when all goroutines end and the channel falls
// out of scope. We receive y values, so only need capacity to receive
// (x-y) remaining values.
accChan := make(chan struct{}, x-y)
// Spawn "x" goroutines
for i := 0; i < x; i += 1 {
// Wrap our work function with the local signalling logic
go func(id int, doneChan chan<- struct{}) {
fmt.Printf("starting goroutine #%d\n", id)
doSomeWork()
fmt.Printf("goroutine #%d completed\n", id)
// Communicate completion of goroutine
doneChan <- struct{}{}
}(i, accChan)
}
for doneCount := 0; doneCount < y; doneCount += 1 {
<-accChan
}
// Continue working
fmt.Println("Carrying on without waiting for more goroutines")
}
As this does not wait for the [y+1, ..., x) goroutines to complete, you should take special care in the doSomeWork
function to remove or minimize the risk that the work can block indefinitely, which would also cause a leak. Remove, where possible, the feasibility of indefinite blocking on I/O (including channel operations) or falling into infinite loops.
You could use a context
to signal to the additional goroutines when their results are no longer required to have them break out of execution.
Upvotes: 5
Reputation: 46442
WaitGroup
doesn't actually wait on goroutines, it waits until its internal counter reaches zero. If you only Add()
the number of goroutines you care about, and you only call Done()
in those goroutines you care about, then Wait()
will only block until those goroutines you care about have finished. You are in complete control of the logic and flow, there are no restrictions on what WaitGroup
"allows".
Upvotes: 2