Reputation: 149
How to I go about implementing the aggregation pattern in Go, I have to send a bunch of http request concurrently where each go routine will call the endpoint and send the response status on a channel. Now on the main calling function I will range through the channel and display all the responses.
The problem is how do I unblock the channel ?? - I cannot close the channel from the go routines as it will be closed before the complete work is done
package main
import (
"fmt"
"net/http"
"sync"
"time"
"golang.org/x/net/context"
)
func main() {
var wg sync.WaitGroup
wg.Add(10)
c := make(chan string, 100)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
for i := 1; i <= 10; i++ {
go SendHttpRequest(ctx, c, &wg)
}
for v := range c {
fmt.Println(v)
}
wg.Wait()
}
func SendHttpRequest(ctx context.Context, c chan string, wg *sync.WaitGroup) {
//defer wg.Done()
client := http.Client{}
req, err := http.NewRequest("POST", "https://jsonplaceholder.typicode.com/posts/1", nil)
if err != nil {
panic(err)
}
req.WithContext(ctx)
res, _ := client.Do(req)
select {
case <-time.After(1 * time.Microsecond):
c <- res.Status
case <-ctx.Done():
c <- "599 ToLong"
}
if res != nil {
defer res.Body.Close()
}
//close(c)
defer wg.Done()
}
Upvotes: 1
Views: 91
Reputation: 57
In this kind of situation use a generator and idiomatic early defer patterns:
import (
"fmt"
"errors"
"net/http"
"sync"
"time"
"golang.org/x/net/context"
)
func main() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() // defer early context cancel
for v := range requests(ctx) {
fmt.Println(v)
}
}
// requests generator (handling synchro)
func requests(ctx context.Context)<-chan string {
c := make(chan string/*, 100*/) // No need for buffer, do it on the fly
go func(){
defer close(c) // defer early chan close, will also check goroutine ending
var wg sync.WaitGroup
defer wg.Wait() // defer early wait
wg.Add(10)
for i := 1; i <= 10; i++ {
go func() {
defer wg.Done() // defer early goroutine waitgroup done
if status, err := SendHttpRequest(ctx, c); err != nil {
c <- status
}
}()
}
}
return c
}
// SendHttpRequest looks more conventional, no goroutines, no syncro (waitgroup not spread)
func SendHttpRequest(ctx context.Context) (status string, err error) {
client := http.Client{}
req, err := http.NewRequest("POST", "https://jsonplaceholder.typicode.com/posts/1", nil)
if err != nil {
return
}
req.WithContext(ctx)
res, err := client.Do(req)
if err != nil {
if errors.Is(err, context.Canceled) { // check that request was not cancelled by context cancel trigger
status = "599 ToLong"
}
return
}
defer res.Body.Close() // defer early response body close (in case of no error)
status = res.Status
return
}
Upvotes: 0
Reputation: 10507
Use the WaitGroup
go func(){
wg.Wait()
close(c)
}()
for v := range c {
fmt.Println(v)
}
// Don't bother with wg.Wait() here
Upvotes: 2