Reputation: 3554
I use the following code but don't know why it crash with error (WaitGroup is reused before previous Wait) at line:
for _, proxy := range proxies {
wgGroup.Wait()
I want to ensure that when calling proxySource.GetProxies()
, and proxyProvider.receivingProxyBC.In() <- proxy
then not allow remoteSources
to call proxyProvider.receivingProxyBC.In() <- proxy
Detail code here:
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
wgGroup.Add(1)
proxies, err := proxySource.GetProxies()
wgGroup.Done()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
wgGroup.Add(1)
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
wgGroup.Done()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
wgGroup.Wait()
proxyProvider.receivingProxyBC.In() <- proxy
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
UPDATE RWLOCK
Using these code I can lock localSources
but it seem not optimized; I need when any localSources
getting then lock all remoteSources
; when there is no localSources
getting, all remoteSources
are allowed to get. Currently, only one remoteSources
is allow to get at the same time.
wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
go func(proxySource *ProxySource) {
lastTimeGet := time.Now()
firstTimeLoad := true
wgGroup.Done()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
localGroupRwLock.RLock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.RUnlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
localGroupRwLock.RLock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.RUnlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
LogInfo("Total proxies ", totalProxy)
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
LogInfo("Get proxy from source ", proxySource.Id)
localGroupRwLock.Lock()
proxies, err := proxySource.GetProxies()
localGroupRwLock.Unlock()
if err != nil {
LogError("Error when get proxies from ", proxySource.Id)
time.Sleep(5 * time.Second)
continue
}
LogInfo("Add proxy from source ", proxySource.Id)
wgGroup.Wait()
localGroupRwLock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
localGroupRwLock.Unlock()
LogInfo("Done add proxy from source ", proxySource.Id)
//LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
lastTimeGet = time.Now()
time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
LogInfo("Watch for proxy source", proxySource.Id)
}
}(proxySource)
}
Upvotes: 1
Views: 1249
Reputation: 8232
From document:
A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.
and for Wait()
:
Wait blocks until the WaitGroup counter is zero.
You can also see examples there. The thing is, WaitGroup
is used for blocing until the counter gets to zero. So in orignal code, assume no runtime error, every goroutine in the second for loop will block until goroutines in the first is done. And in the first part, the Add(1)
and Done()
would not block at all. Data race would remain.
The the error is documented in Add()
methods:
Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.
Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.
However, You are not waiting for independent sets of groups either.
The tool fitting for your code is sync.Mutex
. Document, again:
A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.
A Mutex must not be copied after first use.
type Mutex struct { // contains filtered or unexported fields }
func (*Mutex) Lock
func (m *Mutex) Lock()
Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available. func (*Mutex) Unlock
func (m *Mutex) Unlock()
Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.
So as you describe, you want "pause the calling of proxyProvider.receivingProxyBC.In() <- proxy
when proxySource.GetProxies()
or for _, proxy := range proxies
is called". Pause is better described by the term block, and that is a textbook mutex lock problem: Guard all three "calls" (as for loop is not a call) with locks and it is done.
It might be a little tricky on how to guard a for loop with mutex, it should look like this:
lock.Lock
for ... {
lock.Unlock()
...
lock.Lock()
}
So I changed you code and hopefully it should work as expected:
lock := sync.Mutex{}
lock.Lock()
for _, proxySource := range localSources {
lock.Unlock()
go func(proxySource *ProxySource) {
lock.Lock()
lastTimeGet := time.Now()
firstTimeLoad := true
lock.Unlock()
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
lock.Lock()
proxies, err := proxySource.GetProxies()
lock.Unlock()
LogInfo("Get proxy from source ", proxySource.Id)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
lock.Lock()
for _, proxy := range proxies {
proxyProvider.receivingProxyBC.In() <- proxy
}
lock.Unlock()
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
lock.Lock()
}
for _, proxySource := range remoteSources {
go func(proxySource *ProxySource) {
time.Sleep(2 * time.Second)
lastTimeGet := time.Now()
firstTimeLoad := true
for {
currentTimeGet := time.Now()
totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
time.Sleep(proxySource.WatchWait)
continue
}
firstTimeLoad = false
proxies, err := proxySource.GetProxies()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, proxy := range proxies {
lock.Lock()
proxyProvider.receivingProxyBC.In() <- proxy
lock.Unlock()
}
lastTimeGet = time.Now()
time.Sleep(20 * time.Second)
}
}(proxySource)
}
Note 1: You may be tempted to use defer
. Don't. defer
is for function, not blocks.
Note 2: When using mutex in golang, it often raise a question of designing. One shall always look if it is better to use channel and refactor the code, though in many cases mutex is not a bad idea. But here I can read nothing about the design so I will just let it go.
Note 3: The code auctually has a problem of pausing proxySource.GetProxies()
and the for loop when calling proxyProvider.receivingProxyBC.In() <- proxy
. Whether this is desired or not depends. If it is not desired, you should look at sync.RWMutex
, and change it according to it. I will leave it to you.
Upvotes: 2