o0omycomputero0o
o0omycomputero0o

Reputation: 3554

WaitGroup is reused before previous Wait unknown reason

I use the following code but don't know why it crash with error (WaitGroup is reused before previous Wait) at line:

for _, proxy := range proxies {
                    wgGroup.Wait()

I want to ensure that when calling proxySource.GetProxies(), and proxyProvider.receivingProxyBC.In() <- proxy then not allow remoteSources to call proxyProvider.receivingProxyBC.In() <- proxy

Detail code here:

    wgGroup := sync.WaitGroup{}
    wgGroup.Add(len(localSources))
    for _, proxySource := range localSources {
        go func(proxySource *ProxySource) {
            lastTimeGet := time.Now()
            firstTimeLoad := true
            wgGroup.Done()
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                wgGroup.Add(1)
                proxies, err := proxySource.GetProxies()
                wgGroup.Done()
                LogInfo("Get proxy from source ", proxySource.Id)
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                wgGroup.Add(1)
                for _, proxy := range proxies {
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                wgGroup.Done()
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }
    for _, proxySource := range remoteSources {
        go func(proxySource *ProxySource) {
            time.Sleep(2 * time.Second)
            lastTimeGet := time.Now()
            firstTimeLoad := true
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                proxies, err := proxySource.GetProxies()
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                for _, proxy := range proxies {
                    wgGroup.Wait()
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }

UPDATE RWLOCK

Using these code I can lock localSources but it seem not optimized; I need when any localSources getting then lock all remoteSources; when there is no localSources getting, all remoteSources are allowed to get. Currently, only one remoteSources is allow to get at the same time.

wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
  go func(proxySource *ProxySource) {
    lastTimeGet := time.Now()
    firstTimeLoad := true
    wgGroup.Done()
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      localGroupRwLock.RLock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.RUnlock()
      LogInfo("Get proxy from source ", proxySource.Id)
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      localGroupRwLock.RLock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.RUnlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}
for _, proxySource := range remoteSources {
  go func(proxySource *ProxySource) {
    time.Sleep(2 * time.Second)
    lastTimeGet := time.Now()
    firstTimeLoad := true
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      LogInfo("Get proxy from source ", proxySource.Id)
      localGroupRwLock.Lock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.Unlock()
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      wgGroup.Wait()
      localGroupRwLock.Lock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.Unlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}

Upvotes: 1

Views: 1249

Answers (1)

leaf bebop
leaf bebop

Reputation: 8232

From document:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

and for Wait() :

Wait blocks until the WaitGroup counter is zero.

You can also see examples there. The thing is, WaitGroup is used for blocing until the counter gets to zero. So in orignal code, assume no runtime error, every goroutine in the second for loop will block until goroutines in the first is done. And in the first part, the Add(1) and Done() would not block at all. Data race would remain.

The the error is documented in Add() methods: Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.

Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.

However, You are not waiting for independent sets of groups either.

The tool fitting for your code is sync.Mutex. Document, again:

A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.

A Mutex must not be copied after first use.

type Mutex struct { // contains filtered or unexported fields }

func (*Mutex) Lock

func (m *Mutex) Lock()

Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available. func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.

So as you describe, you want "pause the calling of proxyProvider.receivingProxyBC.In() <- proxy when proxySource.GetProxies() or for _, proxy := range proxies is called". Pause is better described by the term block, and that is a textbook mutex lock problem: Guard all three "calls" (as for loop is not a call) with locks and it is done.

It might be a little tricky on how to guard a for loop with mutex, it should look like this:

lock.Lock
for ... {
    lock.Unlock()
    ...
    lock.Lock()
}

So I changed you code and hopefully it should work as expected:

lock := sync.Mutex{}
lock.Lock()
for _, proxySource := range localSources {
    lock.Unlock()
    go func(proxySource *ProxySource) {
        lock.Lock()
        lastTimeGet := time.Now()
        firstTimeLoad := true
        lock.Unlock()
        for {
            currentTimeGet := time.Now()
            totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
            if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                time.Sleep(proxySource.WatchWait)
                continue
            }
            firstTimeLoad = false
            lock.Lock()
            proxies, err := proxySource.GetProxies()
            lock.Unlock()
            LogInfo("Get proxy from source ", proxySource.Id)
            if err != nil {
                time.Sleep(5 * time.Second)
                continue
            }
            lock.Lock()
            for _, proxy := range proxies {
                proxyProvider.receivingProxyBC.In() <- proxy
            }
            lock.Unlock()
            lastTimeGet = time.Now()
            time.Sleep(20 * time.Second)
        }
    }(proxySource)
    lock.Lock()
}
for _, proxySource := range remoteSources {
    go func(proxySource *ProxySource) {
        time.Sleep(2 * time.Second)
        lastTimeGet := time.Now()
        firstTimeLoad := true
        for {
            currentTimeGet := time.Now()
            totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
            if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                time.Sleep(proxySource.WatchWait)
                continue
            }
            firstTimeLoad = false
            proxies, err := proxySource.GetProxies()
            if err != nil {
                time.Sleep(5 * time.Second)
                continue
            }
            for _, proxy := range proxies {
                lock.Lock()
                proxyProvider.receivingProxyBC.In() <- proxy
                lock.Unlock()
            }
            lastTimeGet = time.Now()
            time.Sleep(20 * time.Second)
        }
    }(proxySource)
}

Note 1: You may be tempted to use defer. Don't. defer is for function, not blocks.

Note 2: When using mutex in golang, it often raise a question of designing. One shall always look if it is better to use channel and refactor the code, though in many cases mutex is not a bad idea. But here I can read nothing about the design so I will just let it go.

Note 3: The code auctually has a problem of pausing proxySource.GetProxies() and the for loop when calling proxyProvider.receivingProxyBC.In() <- proxy. Whether this is desired or not depends. If it is not desired, you should look at sync.RWMutex, and change it according to it. I will leave it to you.

Upvotes: 2

Related Questions