user12517276
user12517276

Reputation: 81

Go net/http leaks memory in high load

I am developing an API that calls client URL using the net/http package. There are between 1 and 8 URLs called for each request (POST call) in goroutines concurrently based on user country/os. The app works with low qps of around 1000-1500 requests, but scaling the app to 3k requests there is a sudden increase in the memory even if only 1 client URL is called an app stops responding after a few minute(Response time well above 50sec). I am using Go native net/http package along with gorilla/mux router. Other question on this issue says to close the response body but I have done that using

        req, err := http.NewRequest("POST", "client_post_url", bytes.NewBuffer(requestBody))
        req.Header.Set("Content-Type", "application/json")
        req.Header.Set("Connection", "Keep-Alive")
        response, err := common.Client.Do(req)
        status := 0
        if err != nil {//handle and return}
        defer response.Body.Close() //used with/without io.Copy
        status = response.StatusCode
        body, _ := ioutil.ReadAll(response.Body)
        _, err = io.Copy(ioutil.Discard, response.Body)

I need to reuse connection hence I have made http client and transport global variable initialized in init method like this.

    common.Transport = &http.Transport{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
        DialContext: (&net.Dialer{
            //Timeout: time.Duration(300) * time.Millisecond,
            KeepAlive: 30 * time.Second,
        }).DialContext,
        //ForceAttemptHTTP2:     true,
        DisableKeepAlives: false,
        //MaxIdleConns:      0,
        //IdleConnTimeout:   0,
        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
        //ExpectContinueTimeout: 1 * time.Second,
    }

    common.Client = &http.Client{
        Timeout:   time.Duration(300) * time.Millisecond,
        Transport: common.Transport,
    }

I have read that using keep alive causes the memory to leak, I have tried a few combination for disabling keep-alive/close request flag on request. But nothing seems to work. Also If I don't make any http call and use time.Sleep(300 * time.Millisecond) in goroutine calling each url concurrently app does work without any leak. So I am sure It has something to do with client/http package that under high load connection are not released or not used properly.

What should be my approach to achieve this? Is creating a custom server and custom handler type to accept request and route requests will worked as mentioned in C10K approach in several article? I can share the sample code with all details if required. Above just added that the part where I feel the issue lies.

this is a representative code

main.go

package main

import (
    "./common"
    "bytes"
    "crypto/tls"
    "fmt"
    "github.com/gorilla/mux"
    "io"
    "io/ioutil"
    "log"
    "math/rand"
    "net"
    "net/http"
    "net/http/pprof"
    "os"
    "runtime"
    "strconv"
    "sync"
    "time"
)

func init() {

    //Get Any command line argument passed
    args := os.Args[1:]
    numCPU := runtime.NumCPU()
    if len(args) > 1 {
        numCPU, _ = strconv.Atoi(args[0])
    }

    common.Transport = &http.Transport{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
        DialContext: (&net.Dialer{
            //Timeout: time.Duration() * time.Millisecond,
            KeepAlive: 30 * time.Second,
        }).DialContext,
        //ForceAttemptHTTP2:     true,
        DisableKeepAlives: false,
        //MaxIdleConns:      0,
        //IdleConnTimeout:   0,
        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
        //ExpectContinueTimeout: 1 * time.Second,
    }

    common.Client = &http.Client{
        Timeout:   time.Duration(300) * time.Millisecond,
        Transport: common.Transport,
    }

    runtime.GOMAXPROCS(numCPU)

    rand.Seed(time.Now().UTC().UnixNano())
}

func main() {

    router := mux.NewRouter().StrictSlash(true)
    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        _, _ = fmt.Fprintf(w, "Hello!!!")
    })

    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)

        prepareRequest(w, r, vars["name"])

    }).Methods("POST")

    // Register pprof handlers
    router.HandleFunc("/debug/pprof/", pprof.Index)
    router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
    router.HandleFunc("/debug/pprof/profile", pprof.Profile)
    router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
    router.HandleFunc("/debug/pprof/trace", pprof.Trace)

    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")

    srv := &http.Server{
        Addr: "0.0.0.0:" + "80",
        /*ReadTimeout:  500 * time.Millisecond,
        WriteTimeout: 500 * time.Millisecond,
        IdleTimeout:  10 * time.Second,*/
        Handler: routerMiddleWare,
    }

    log.Fatal(srv.ListenAndServe())
}

func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {

    //other part of the code and call to goroutine
    var urls []string
    results, s, c := callUrls(urls)
    finalCall(w, results, s, c)

}

type Response struct {
    Status int
    Url    string
    Body   string
}

func callUrls(urls []string) ([]*Response, []string, []string) {
    var wg sync.WaitGroup
    wg.Add(len(urls))
    ch := make(chan func() (*Response, string, string), len(urls))
    for _, url := range urls {
        go func(url string) {
            //decide if request is valid for client to make http call using country/os
            isValid := true //assuming url to be called
            if isValid {
                //make post call
                //request body have many more paramter, just sample included.
                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
                req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))
                req.Header.Set("Content-Type", "application/json")
                req.Header.Set("Connection", "Keep-Alive")
                //req.Close = true

                response, err := common.Client.Do(req)

                if err != nil {
                    wg.Done()
                    ch <- func() (*Response, string, string) {
                        return &Response{Status: 500, Url: url, Body: ""}, "error", "500"
                    }
                    return
                }

                defer response.Body.Close()
                body, _ := ioutil.ReadAll(response.Body)
                _, err = io.Copy(ioutil.Discard, response.Body)

                //Close the body, forced this
                //Also tried without defer, and only wothout following line
                response.Body.Close()

                //do something with response body replace a few string etc.
                //and return
                wg.Done()
                ch <- func() (*Response, string, string) {
                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
                }

            } else {
                wg.Done()
                ch <- func() (*Response, string, string) {
                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
                }
            }

        }(url)
    }
    wg.Wait()
    var (
        results []*Response
        msg     []string
        status  []string
    )
    for {
        r, x, y := (<-ch)()
        if r != nil {

            results = append(results, r)
            msg = append(msg, x)
            status = append(status, y)
        }
        if len(results) == len(urls) {
            return results, msg, status
        }

    }
}

func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string){
    fmt.Println("response", "response body", results, msg, status)
}

vars.go

package common
import (
    "net/http"
)

var (
    //http client
    Client *http.Client

    //http Transport
    Transport *http.Transport
)

pprof: Profiled app with 4 client url on average of around 2500qps.

enter image description here

Top command: enter image description here After 2minutes: enter image description here

Without calling client url, by keeping isValid = false and time.Sleep(300* time.Millisecond) no leaks happens. enter image description here

Upvotes: 7

Views: 9572

Answers (2)

user12517276
user12517276

Reputation: 81

I have solved it by replacing net/http package with fasthttp. Earlier I haven't used it because I was not able find timeout method on fasthttp client but I see that there is indeed a method DoTimeout for fasthttp client which timedout the request after specified duration.

Here the updated code:

in vars.go ClientFastHttp *fasthttp.Client

main.go

package main

import (
    "./common"
    "crypto/tls"
    "fmt"
    "github.com/gorilla/mux"
    "github.com/valyala/fasthttp"
    "log"
    "math/rand"
    "net"
    "net/http"
    "net/http/pprof"
    "os"
    "runtime"
    "strconv"
    "sync"
    "time"
)

func init() {

    //Get Any command line argument passed
    args := os.Args[1:]
    numCPU := runtime.NumCPU()
    if len(args) > 1 {
        numCPU, _ = strconv.Atoi(args[0])
    }

    common.Transport = &http.Transport{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
        DialContext: (&net.Dialer{
            //Timeout: time.Duration() * time.Millisecond,
            KeepAlive: 30 * time.Second,
        }).DialContext,
        //ForceAttemptHTTP2:     true,
        DisableKeepAlives: false,
        //MaxIdleConns:      0,
        //IdleConnTimeout:   0,
        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
        //ExpectContinueTimeout: 1 * time.Second,
    }

    common.Client = &http.Client{
        Timeout:   time.Duration(300) * time.Millisecond,
        Transport: common.Transport,
    }

    runtime.GOMAXPROCS(numCPU)

    rand.Seed(time.Now().UTC().UnixNano())
}

func main() {

    router := mux.NewRouter().StrictSlash(true)
    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        _, _ = fmt.Fprintf(w, "Hello!!!")
    })

    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)

        prepareRequest(w, r, vars["name"])

    }).Methods("POST")

    // Register pprof handlers
    router.HandleFunc("/debug/pprof/", pprof.Index)
    router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
    router.HandleFunc("/debug/pprof/profile", pprof.Profile)
    router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
    router.HandleFunc("/debug/pprof/trace", pprof.Trace)

    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")

    srv := &http.Server{
        Addr: "0.0.0.0:" + "80",
        /*ReadTimeout:  500 * time.Millisecond,
        WriteTimeout: 500 * time.Millisecond,
        IdleTimeout:  10 * time.Second,*/
        Handler: routerMiddleWare,
    }

    log.Fatal(srv.ListenAndServe())
}

func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {

    //other part of the code and call to goroutine
    var urls []string
    results, s, c := callUrls(urls)
    finalCall(w, results, s, c)

}

type Response struct {
    Status int
    Url    string
    Body   string
}

func callUrls(urls []string) ([]*Response, []string, []string) {
    var wg sync.WaitGroup
    wg.Add(len(urls))
    ch := make(chan func() (*Response, string, string), len(urls))
    for _, url := range urls {
        go func(url string) {
            //decide if request is valid for client to make http call using country/os
            isValid := true //assuming url to be called
            if isValid {
                //make post call
                //request body have many more paramter, just sample included.
                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
                req := fasthttp.AcquireRequest()
                req.SetRequestURI(url)
                req.Header.Set("Content-Type", "application/json")
                req.Header.Set("Connection", "Keep-Alive")
                req.Header.SetMethod("POST")
                req.SetBody([]byte(`{"body":"param"}`))

                resp := fasthttp.AcquireResponse()

                defer fasthttp.ReleaseRequest(req)   // <- do not forget to release
                defer fasthttp.ReleaseResponse(resp) // <- do not forget to release

                //err := clientFastHttp.Do(req, response)
                //endregion
                t := time.Duration(300)

                err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)

                body := resp.Body()

                if err != nil {
                    wg.Done()
                    ch <- func() (*Response, string, string) {
                        return &Response{Status: 500, Url: url, Body: ""}, "error", "500"
                    }
                    return
                }

                /*defer response.Body.Close()
                body, _ := ioutil.ReadAll(response.Body)
                _, err = io.Copy(ioutil.Discard, response.Body)

                //Close the body, forced this
                //Also tried without defer, and only wothout following line
                response.Body.Close()*/

                //do something with response body replace a few string etc.
                //and return
                wg.Done()
                ch <- func() (*Response, string, string) {
                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
                }

            } else {
                wg.Done()
                ch <- func() (*Response, string, string) {
                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
                }
            }

        }(url)
    }
    wg.Wait()
    var (
        results []*Response
        msg     []string
        status  []string
    )
    for {
        r, x, y := (<-ch)()
        if r != nil {

            results = append(results, r)
            msg = append(msg, x)
            status = append(status, y)
        }
        if len(results) == len(urls) {
            return results, msg, status
        }

    }
}

func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
    fmt.Println("response", "response body", results, msg, status)
}

Upvotes: -1

user4466350
user4466350

Reputation:

this code is not leaking.

To demonstrate, lets update it ** slightly so the post is reproducible.

main.go

package main

import (
    "bytes"
    "crypto/tls"
    _ "expvar"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "math/rand"
    "net"
    "net/http"
    _ "net/http/pprof"
    "os"
    "runtime"
    "strconv"
    "sync"
    "time"

    "github.com/gorilla/mux"
)

var (
    //http client
    Client *http.Client

    //http Transport
    Transport *http.Transport
)


func init() {

    go http.ListenAndServe("localhost:6060", nil)

    //Get Any command line argument passed
    args := os.Args[1:]
    numCPU := runtime.NumCPU()
    if len(args) > 1 {
        numCPU, _ = strconv.Atoi(args[0])
    }

    Transport = &http.Transport{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
        DialContext: (&net.Dialer{
            //Timeout: time.Duration() * time.Millisecond,
            KeepAlive: 30 * time.Second,
        }).DialContext,
        //ForceAttemptHTTP2:     true,
        DisableKeepAlives: false,
        //MaxIdleConns:      0,
        //IdleConnTimeout:   0,
        //TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
        //ExpectContinueTimeout: 1 * time.Second,
    }

    Client = &http.Client{
        // Timeout:   time.Duration(300) * time.Millisecond,
        Transport: Transport,
    }

    runtime.GOMAXPROCS(numCPU)

    rand.Seed(time.Now().UTC().UnixNano())
}

func main() {

    router := mux.NewRouter().StrictSlash(true)
    router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        _, _ = fmt.Fprintf(w, "Hello!!!")
    })

    router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)

        prepareRequest(w, r, vars["name"])

    }).Methods("POST", "GET")

    // Register pprof handlers
    // router.HandleFunc("/debug/pprof/", pprof.Index)
    // router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
    // router.HandleFunc("/debug/pprof/profile", pprof.Profile)
    // router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
    // router.HandleFunc("/debug/pprof/trace", pprof.Trace)

    routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")

    srv := &http.Server{
        Addr: "localhost:8080",
        /*ReadTimeout:  500 * time.Millisecond,
          WriteTimeout: 500 * time.Millisecond,
          IdleTimeout:  10 * time.Second,*/
        Handler: routerMiddleWare,
    }

    log.Fatal(srv.ListenAndServe())
}

func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {

    // go func() {
    //  make(chan []byte) <- make([]byte, 10024)
    // }()

    //other part of the code and call to goroutine
    var urls []string
    urls = append(urls,
        "http://localhost:7000/",
        "http://localhost:7000/",
    )
    results, s, c := callUrls(urls)
    finalCall(w, results, s, c)

}

type Response struct {
    Status int
    Url    string
    Body   string
}

func callUrls(urls []string) ([]*Response, []string, []string) {
    var wg sync.WaitGroup
    wg.Add(len(urls))
    ch := make(chan func() (*Response, string, string), len(urls))
    for _, url := range urls {
        go func(url string) {
            //decide if request is valid for client to make http call using country/os
            isValid := true //assuming url to be called
            if isValid {
                //make post call
                //request body have many more paramter, just sample included.
                //instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
                req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))
                if err != nil {
                    wg.Done()
                    ch <- func() (*Response, string, string) {
                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
                    }
                    return
                }
                req.Header.Set("Content-Type", "application/json")
                req.Header.Set("Connection", "Keep-Alive")
                //req.Close = true

                response, err := Client.Do(req)

                if err != nil {
                    wg.Done()
                    ch <- func() (*Response, string, string) {
                        return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
                    }
                    return
                }

                defer response.Body.Close()
                body, _ := ioutil.ReadAll(response.Body)
                io.Copy(ioutil.Discard, response.Body)

                //Close the body, forced this
                //Also tried without defer, and only wothout following line
                response.Body.Close()

                //do something with response body replace a few string etc.
                //and return
                wg.Done()
                ch <- func() (*Response, string, string) {
                    return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
                }

            } else {
                wg.Done()
                ch <- func() (*Response, string, string) {
                    return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
                }
            }

        }(url)
    }
    wg.Wait()
    var (
        results []*Response
        msg     []string
        status  []string
    )
    for {
        r, x, y := (<-ch)()
        if r != nil {

            results = append(results, r)
            msg = append(msg, x)
            status = append(status, y)
        }
        if len(results) == len(urls) {
            return results, msg, status
        }

    }
}

func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
    fmt.Println("response", "response body", results, msg, status)
}

k/main.go

package main

import "net/http"

func main() {
    y := make([]byte, 100)
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write(y)
    })
    http.ListenAndServe(":7000", nil)
}

Install additional visualization tool, and use ab to simulate some load, it will do the job for that intuitive demonstration.

go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms

At that point you read the output of expvarmon, if it was live you have something like

app is loaded

you can see the stuff waving, the gc is being actively working.

the app is loaded, the memory is being consumed, wait for the server to release its conn and the gc to clean them

app is gced

You can see the memstats.Alloc, memstats.HeapAlloc, memstats.HeapInuse are now reduced, as expected when the gc does his job and that no leak exists.

If you were to check for go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap, right after ab ran

pprof inuse_space

It shows that the app is using 177Mb of memory.

Most of it 102Mb is being used by net/http.Transport.getConn.

Your handler is accouting for 1Mb, the rest is various things required.

If you were to take the screenshot after the server has released and the gc too, you would see an even smaller graph. not demonstrated here.

Now let us generate a leak and see it using both tools again.

In the code uncomment in,


func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {

    go func() {
        make(chan []byte) <- make([]byte, 10024)
    }()
//...

restart apps (press q in expvarmon, although it is not required)

go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms

it shows

leaky app before gc

leaky app after gc

In expvarmon you can see the same behavior, only the numbers has changed, and at rest state, after it has been gced, it still consumed a lot of memory, a lot more than a void golang http server to take a comparison point.

Again, screenshot the heap, it shows that your handler is now consuming most of the memory ~450Mb, notice the arrows, it shows that there is for 452mb of 10kb allocations, and 4.50Mb of 96b. They respectively correspond to the []byte slice being pushed to the chan []byte.

leaky app pprof

Finally, you can check your stack traces to look for dead goroutines, and thus leaking memory, open http://localhost:6060/debug/pprof/goroutine?debug=1

goroutine profile: total 50012

50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281
#   0x76b85c    main.prepareRequest.func1+0x4c  /home/mh-cbon/gow/src/test/oom/main.go:101

4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281
#   0x42b685    internal/poll.runtime_pollWait+0x55     /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182
#   0x4c3a3a    internal/poll.(*pollDesc).wait+0x9a     /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87
// more...

It tells us that the programs is hosting 50 012 goroutines, then it lists them grouped by file positions, where the first number is the count of instances running, 50 000 in the first group of this example. It is followed by the stack trace that lead to the goroutine to exist.

You can see there is a bunch of system thing, that in your case, you should not worry much about it.

You got to look for those that you believe you should not be live if your program was working as you think it should.

However, overall your code is not satisfying and could be, and probably, should be improved with a thorough review about its allocations and overall design conception.

** This is a summary of the changes applied to the original source code.

  • It adds a new program k/main.go to act as a backend server.
  • It adds _ "expvar" import statement
  • It starts the std api HTTP server instance that pprof registers onto during init phase with go http.ListenAndServe("localhost:6060", nil)
  • The client timeout is disabled Timeout: time.Duration(300) * time.Millisecond,, otherwise the load test does not return 200s
  • The server address is set to Addr: "localhost:8080",
  • The urls values created within prepareRequest are set to a static list of len=2
  • It adds error checking for req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte({"body":"param"})))
  • It disalbles error checking in io.Copy(ioutil.Discard, response.Body)

Upvotes: 7

Related Questions