MikeKlemin
MikeKlemin

Reputation: 939

Go: channel many slow API queries into single SQL transaction

I wonder what would be idiomatic way to do as following. I have N slow API queries, and one database connection, I want to have a buffered channel, where responses will come, and one database transaction which I will use to write data. I could only come up with semaphore thing as following makeup example:

    func myFunc(){
      //10 concurrent API calls
      sem := make(chan bool, 10) 
     //A concurrent safe map as buffer
      var myMap  MyConcurrentMap 

      for i:=0;i<N;i++{
        sem<-true
        go func(i int){
          defer func(){<-sem}()
          resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
          myMap.Put(resp)
        }(i)
      }

      for j=0;j<cap(sem);j++{
        sem<-true
      }
      tx,_ := db.Begin()    
      for data:=range myMap{
       tx.Exec("Insert data into database")
      }
      tx.Commit()
}

I am nearly sure there is simpler, cleaner and more proper solution, but it is seems complicated to grasp for me.

EDIT: Well, I come with following solution, this way I do not need the buffer map, so once data comes to resp channel the data is printed or can be used to insert into a database, it works, I am still not sure if everything OK, at last there are no race.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

//Gloab waitGroup
var wg sync.WaitGroup

func init() {
    //just for fun sake, make rand seeded
    rand.Seed(time.Now().UnixNano())
}

//Emulate a slow API call
func verySlowAPI(id int) int {
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    return n
}

func main() {
    //Amount of tasks
    N := 100

    //Concurrency level
    concur := 10

    //Channel for tasks
    tasks := make(chan int, N)

    //Channel for responses
    resp := make(chan int, 10)

    //10 concurrent groutinezs
    wg.Add(concur) 
    for i := 1; i <= concur; i++ {
        go worker(tasks, resp)
    }

    //Add tasks
    for i := 0; i < N; i++ {
        tasks <- i
    }

    //Collect data from goroutiens
    for i := 0; i < N; i++ {
        fmt.Printf("%d\n", <-resp)
    }

    //close the tasks channel
    close(tasks)

    //wait till finish
    wg.Wait()

}

func worker(task chan int, resp chan<- int) {
    defer wg.Done()
    for {
        task, ok := <-task
        if !ok {
            return
        }
        n := verySlowAPI(task)
        resp <- n
    }
}

Upvotes: 0

Views: 840

Answers (2)

Adrian
Adrian

Reputation: 46413

There's no need to use channels for a semaphore, sync.WaitGroup was made for waiting for a set of routines to complete.

If you're using the channel to limit throughput, you're better off with a worker pool, and using the channel to pass jobs to the workers:

type job struct {
    i int
}

func myFunc(N int) {
    // Adjust as needed for total number of tasks
    work := make(chan job, 10)
    // res being whatever type slowAPICall returns
    results := make(chan res, 10)
    resBuff := make([]res, 0, N)

    wg := new(sync.WaitGroup)

    // 10 concurrent API calls
    for i = 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            for j := range work {
                resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d", j.i))
                results <- resp
            }
            wg.Done()
        }()
    }

    go func() {
        for r := range results {
            resBuff = append(resBuff, r)
        }
    }

    for i = 0; i < N; i++ {
        work <- job{i}
    }
    close(work)

    wg.Wait()
    close(results)
}

Upvotes: 2

Pavlo Strokov
Pavlo Strokov

Reputation: 2087

Maybe this will work for you. Now you can get rid of your concurrent map. Here is a code snippet:

func myFunc() {
    //10 concurrent API calls
    sem := make(chan bool, 10)
    respCh := make(chan YOUR_RESP_TYPE, 10)
    var responses []YOUR_RESP_TYPE

    for i := 0; i < N; i++ {
        sem <- true
        go func(i int) {
            defer func() {
                <-sem
            }()
            resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
            respCh <- resp
        }(i)
    }

    respCollected := make(chan struct{})
    go func() {
        for i := 0; i < N; i++ {
            responses = append(responses, <-respCh)
        }
        close(respCollected)
    }()

    <-respCollected
    tx,_ := db.Begin()
    for _, data := range responses {
        tx.Exec("Insert data into database")
    }
    tx.Commit()
}

Than we need to use one more goroutine that will collect all responses in some slice or map from a response channel.

Upvotes: 1

Related Questions