Vinny
Vinny

Reputation: 149

Concurrency in Go

How to I go about implementing the aggregation pattern in Go, I have to send a bunch of http request concurrently where each go routine will call the endpoint and send the response status on a channel. Now on the main calling function I will range through the channel and display all the responses.

The problem is how do I unblock the channel ?? - I cannot close the channel from the go routines as it will be closed before the complete work is done

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"

    "golang.org/x/net/context"
)

func main() {

    var wg sync.WaitGroup
    wg.Add(10)
    c := make(chan string, 100)
    ctx := context.Background()

    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

    defer cancel()
    for i := 1; i <= 10; i++ {
        go SendHttpRequest(ctx, c, &wg)
    }

    for v := range c {
        fmt.Println(v)
    }

    wg.Wait()

}

func SendHttpRequest(ctx context.Context, c chan string, wg *sync.WaitGroup) {

    //defer wg.Done()
    client := http.Client{}
    req, err := http.NewRequest("POST", "https://jsonplaceholder.typicode.com/posts/1", nil)
    if err != nil {
        panic(err)
    }
    req.WithContext(ctx)

    res, _ := client.Do(req)

    select {
    case <-time.After(1 * time.Microsecond):
        c <- res.Status
    case <-ctx.Done():
        c <- "599 ToLong"
    }
    if res != nil {
        defer res.Body.Close()
    }
    //close(c)
    defer wg.Done()
}

Upvotes: 1

Views: 91

Answers (2)

DBZ7
DBZ7

Reputation: 57

In this kind of situation use a generator and idiomatic early defer patterns:

import (
    "fmt"
    "errors"
    "net/http"
    "sync"
    "time"

    "golang.org/x/net/context"
)

func main() {

    ctx := context.Background()
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel() // defer early context cancel 

    for v := range requests(ctx) {
        fmt.Println(v)
    }

}

// requests generator (handling synchro)
func requests(ctx context.Context)<-chan string {

    c := make(chan string/*, 100*/) // No need for buffer, do it on the fly
    go func(){
        defer close(c) // defer early chan close, will also check goroutine ending
        
        var wg sync.WaitGroup
        defer wg.Wait() // defer early wait
    
        wg.Add(10)
        for i := 1; i <= 10; i++ {
            go func() {
                defer wg.Done() // defer early goroutine waitgroup done

                if status, err := SendHttpRequest(ctx, c); err != nil {
                    c <- status
                }
            }()
        }
    }

    return c
}

// SendHttpRequest looks more conventional, no goroutines, no syncro (waitgroup not spread)
func SendHttpRequest(ctx context.Context) (status string, err error) {

    client := http.Client{}
    req, err := http.NewRequest("POST", "https://jsonplaceholder.typicode.com/posts/1", nil)
    if err != nil {
        return
    }
    req.WithContext(ctx)

    res, err := client.Do(req)
    if err != nil {
        if errors.Is(err, context.Canceled) { // check that request was not cancelled by context cancel trigger
            status = "599 ToLong"
        }
        return 
    }
    defer res.Body.Close() // defer early response body close (in case of no error)
    status = res.Status
    return
}

Upvotes: 0

poy
poy

Reputation: 10507

Use the WaitGroup

go func(){
  wg.Wait()
  close(c)
}()

for v := range c {
  fmt.Println(v)
}

// Don't bother with wg.Wait() here

Upvotes: 2

Related Questions