Shashank pai k
Shashank pai k

Reputation: 59

how to run 10000 goroutines in parallel where each routine calls an api?

I have the following code, where I am trying to call an api 10000 times but i am getting errors:

package main

import (
    "fmt"

    "net/http"
    "runtime"
    "sync"
    "time"
)

func main() {

    nCPU := runtime.NumCPU()
    runtime.GOMAXPROCS(nCPU)

    var wg sync.WaitGroup
    totalRequests := 100000
    wg.Add(totalRequests)

    fmt.Println("Starting Go Routines")

    start := time.Now()
    total := 0

    for i := 0; i < totalRequests; i++ {

        go func(current int) {
            defer wg.Done()

            startFunc := time.Now()
            _, err := http.Get("http://127.0.0.1:8080/event/list")
            // resp, err := http.Get("https://graph.facebook.com/v2.4/me" + "?fields=id%2Cname&access_token=" + "CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD")

            if err != nil {
                fmt.Println(err)
            }
            // defer resp.Body.Close()
            elapsedFunc := time.Since(startFunc)
            fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
            total++

        }(i)

    }

    wg.Wait()
    elapsed := time.Since(start)
    fmt.Println("\nThe total time with cores", elapsed)
    fmt.Println("\nTerminating Program")
}

The errors I am getting:

Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445

Upvotes: 1

Views: 2834

Answers (1)

Antony Mativos
Antony Mativos

Reputation: 883

As others mentioned in the comments, your main issue is that you are exceeding the open file limit of the process.

You can easily implement a semaphore using channels to limit concurrency:

totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)

start := time.Now()
total := int32(0)

for i := 0; i < totalRequests; i++ {
    sem <- true

    go func(current int) {
        startTime := time.Now()

        // Make request here

        elapsedTime := time.Since(startTime)
        atomic.AddInt32(&total, 1)
        fmt.Printf("Request %d took %s. Requests completed: %d\n", current, elapsedTime, atomic.LoadInt32(&total))

        <-sem
    }(i)
}

for i := 0; i < cap(sem); i++ {
    sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\nTotal time elapsed: %s\n", elapsedTotal)

This will limit the number of parallel requests to whatever is specified in concurrency.

As you can see, the total variable is incremented using the atomic package since we are modifying that variable from potentially parallel goroutines, which could have produced an incorrect total when modified unsafely, as you did.

See this blog post for the original example & explanation of limiting concurrency in Go: http://jmoiron.net/blog/limiting-concurrency-in-go

EDIT:

As mentioned by JimB below, another common approach is to have concurrency number of goroutines doing the work while we feed it to them. Here's a generic do function that one might use for this:

func do(total, concurrency int, fn func(int)) {
    workQueue := make(chan int, concurrency)

    var wg sync.WaitGroup
    wg.Add(concurrency)

    for i := 0; i < concurrency; i++ {
        go func() {
            for i := range workQueue {
                fn(i)
            }
            wg.Done()
        }()
    }
    for i := 0; i < total; i++ {
        workQueue <- i
    }
    close(workQueue)
    wg.Wait()
}

We spawn concurrency goroutines and then start sending values to the workQueue channel until total is sent. By closing the workQueue channel we effectively terminate the range loops in our goroutines. After that we just wait until all the remaining goroutines finish running.

For the use case in question, it could be used like this:

totalRequests := 1000000
concurrency := 1024

do(totalRequests, concurrency, func(i int) {
    // Make request here

    fmt.Printf("Request %d done.\n", i)
})

Upvotes: 5

Related Questions