Reputation: 59
I have the following code, where I am trying to call an api 10000 times but i am getting errors:
package main
import (
"fmt"
"net/http"
"runtime"
"sync"
"time"
)
func main() {
nCPU := runtime.NumCPU()
runtime.GOMAXPROCS(nCPU)
var wg sync.WaitGroup
totalRequests := 100000
wg.Add(totalRequests)
fmt.Println("Starting Go Routines")
start := time.Now()
total := 0
for i := 0; i < totalRequests; i++ {
go func(current int) {
defer wg.Done()
startFunc := time.Now()
_, err := http.Get("http://127.0.0.1:8080/event/list")
// resp, err := http.Get("https://graph.facebook.com/v2.4/me" + "?fields=id%2Cname&access_token=" + "CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD")
if err != nil {
fmt.Println(err)
}
// defer resp.Body.Close()
elapsedFunc := time.Since(startFunc)
fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
total++
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Println("\nThe total time with cores", elapsed)
fmt.Println("\nTerminating Program")
}
The errors I am getting:
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445
Upvotes: 1
Views: 2834
Reputation: 883
As others mentioned in the comments, your main issue is that you are exceeding the open file limit of the process.
You can easily implement a semaphore using channels to limit concurrency:
totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)
start := time.Now()
total := int32(0)
for i := 0; i < totalRequests; i++ {
sem <- true
go func(current int) {
startTime := time.Now()
// Make request here
elapsedTime := time.Since(startTime)
atomic.AddInt32(&total, 1)
fmt.Printf("Request %d took %s. Requests completed: %d\n", current, elapsedTime, atomic.LoadInt32(&total))
<-sem
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\nTotal time elapsed: %s\n", elapsedTotal)
This will limit the number of parallel requests to whatever is specified in concurrency
.
As you can see, the total
variable is incremented using the atomic
package since we are modifying that variable from potentially parallel goroutines, which could have produced an incorrect total when modified unsafely, as you did.
See this blog post for the original example & explanation of limiting concurrency in Go: http://jmoiron.net/blog/limiting-concurrency-in-go
EDIT:
As mentioned by JimB below, another common approach is to have concurrency
number of goroutines doing the work while we feed it to them. Here's a generic do
function that one might use for this:
func do(total, concurrency int, fn func(int)) {
workQueue := make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for i := range workQueue {
fn(i)
}
wg.Done()
}()
}
for i := 0; i < total; i++ {
workQueue <- i
}
close(workQueue)
wg.Wait()
}
We spawn concurrency
goroutines and then start sending values to the workQueue
channel until total
is sent. By closing the workQueue
channel we effectively terminate the range loops in our goroutines. After that we just wait until all the remaining goroutines finish running.
For the use case in question, it could be used like this:
totalRequests := 1000000
concurrency := 1024
do(totalRequests, concurrency, func(i int) {
// Make request here
fmt.Printf("Request %d done.\n", i)
})
Upvotes: 5