Reputation: 939
I wonder what would be idiomatic way to do as following. I have N slow API queries, and one database connection, I want to have a buffered channel, where responses will come, and one database transaction which I will use to write data. I could only come up with semaphore thing as following makeup example:
func myFunc(){
//10 concurrent API calls
sem := make(chan bool, 10)
//A concurrent safe map as buffer
var myMap MyConcurrentMap
for i:=0;i<N;i++{
sem<-true
go func(i int){
defer func(){<-sem}()
resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
myMap.Put(resp)
}(i)
}
for j=0;j<cap(sem);j++{
sem<-true
}
tx,_ := db.Begin()
for data:=range myMap{
tx.Exec("Insert data into database")
}
tx.Commit()
}
I am nearly sure there is simpler, cleaner and more proper solution, but it is seems complicated to grasp for me.
EDIT:
Well, I come with following solution, this way I do not need the buffer map, so once data comes to resp
channel the data is printed or can be used to insert into a database, it works, I am still not sure if everything OK, at last there are no race.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
//Gloab waitGroup
var wg sync.WaitGroup
func init() {
//just for fun sake, make rand seeded
rand.Seed(time.Now().UnixNano())
}
//Emulate a slow API call
func verySlowAPI(id int) int {
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
return n
}
func main() {
//Amount of tasks
N := 100
//Concurrency level
concur := 10
//Channel for tasks
tasks := make(chan int, N)
//Channel for responses
resp := make(chan int, 10)
//10 concurrent groutinezs
wg.Add(concur)
for i := 1; i <= concur; i++ {
go worker(tasks, resp)
}
//Add tasks
for i := 0; i < N; i++ {
tasks <- i
}
//Collect data from goroutiens
for i := 0; i < N; i++ {
fmt.Printf("%d\n", <-resp)
}
//close the tasks channel
close(tasks)
//wait till finish
wg.Wait()
}
func worker(task chan int, resp chan<- int) {
defer wg.Done()
for {
task, ok := <-task
if !ok {
return
}
n := verySlowAPI(task)
resp <- n
}
}
Upvotes: 0
Views: 840
Reputation: 46413
There's no need to use channels for a semaphore, sync.WaitGroup was made for waiting for a set of routines to complete.
If you're using the channel to limit throughput, you're better off with a worker pool, and using the channel to pass jobs to the workers:
type job struct {
i int
}
func myFunc(N int) {
// Adjust as needed for total number of tasks
work := make(chan job, 10)
// res being whatever type slowAPICall returns
results := make(chan res, 10)
resBuff := make([]res, 0, N)
wg := new(sync.WaitGroup)
// 10 concurrent API calls
for i = 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := range work {
resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d", j.i))
results <- resp
}
wg.Done()
}()
}
go func() {
for r := range results {
resBuff = append(resBuff, r)
}
}
for i = 0; i < N; i++ {
work <- job{i}
}
close(work)
wg.Wait()
close(results)
}
Upvotes: 2
Reputation: 2087
Maybe this will work for you. Now you can get rid of your concurrent map. Here is a code snippet:
func myFunc() {
//10 concurrent API calls
sem := make(chan bool, 10)
respCh := make(chan YOUR_RESP_TYPE, 10)
var responses []YOUR_RESP_TYPE
for i := 0; i < N; i++ {
sem <- true
go func(i int) {
defer func() {
<-sem
}()
resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
respCh <- resp
}(i)
}
respCollected := make(chan struct{})
go func() {
for i := 0; i < N; i++ {
responses = append(responses, <-respCh)
}
close(respCollected)
}()
<-respCollected
tx,_ := db.Begin()
for _, data := range responses {
tx.Exec("Insert data into database")
}
tx.Commit()
}
Than we need to use one more goroutine that will collect all responses in some slice or map from a response channel.
Upvotes: 1