Mike
Mike

Reputation: 790

GO code with execution control using channels

I'm extracting from a long redshift table all its data in chunks, each chunk to a csv file. I want to control how many files are created at the "same" time (concurrently), i.e. if the whole process will create 10 files, I want to, let's say, create 4 files, wait until they are created and once they are "done", create another 4, and then the remaining 2.

How can I achieve this using channel/s?

I've tried to change the following slice to a channel, but I couldn't get it to work as I said, the implementation I did, did not wait/stop for the 4 first files to end before creating the following ones.

Right now I'm doing the following using WaitGroup:

package, imports, var, etc...

//Inside func main:

//Create a WaitGroup
var wg = sync.WaitGroup{}

//Opening the connection
db, err := sql.Open("postgres", connStr)
if err != nil {
    panic(err.Error())
}
defer db.Close()

//Define chunks using a slice
chunkSizer := Slicer(totalRowsInTable, numberRowsByChunk) // e.g. []int{100, 100, 100...  100}

//Iterating over the array
for index, value := range chunkSizer {
    wg.Add(1)
    go ExtractorToCSV(db, queriedSection, expFileName)

    if (index+1)% 4 == 0 {   // <-- 4 is the maximum number of files created at the "same" time
            wg.Wait()
        }

    wg.Wait() // <-- waits for the remaining files (last 2 in this case)

}

//Outside main
func ExtractorToCSV(db *sql.DB, queryToExtract, fileName string) {
    //... do its process
    wg.Done()
}

I've tried using a buffered channel of the size that I wanted to stop (4 in this case), but I didn't use it properly, I don't know...

Thanks in advance.

Upvotes: 0

Views: 118

Answers (1)

Fahim Bagar
Fahim Bagar

Reputation: 828

UPDATED - STOP CONDITION

You can use channel to hold the next line of code like this. This is minimum code that I write for you. Tweak it as you like

var doneCh = make(chan bool)

func main() {
    WRITE_POOL := 4

    for index, val := range RANGE {
        go extractToFile(val)

        if (index + 1) % WRITE_POOL == 0 {
            // wait for doneCh to finish 
            // if the iteration is divisive of WRITE_POOL
            <-doneCh
            <-doneCh
            <-doneCh
            <-doneCh
        } else if index == MAX - 1 {
            // wait for whatever doneCh left to finish 
            // if current val is the last one
            LEFT := MAX - index - 1
            for i := 0; i < LEFT; i++ {
                <-doneCh
            }
        }
    }
}

func extractToFile(val int) {
    os.Create(fmt.Sprintf("test-%d", val))
    doneCh<-true
}

For better performance, try to :

  1. Create data channel to main function can send the data to it and ExtractorToCSV can receive it.
  2. Create ExtractorToCSV as goroutine and read from data channel, after ExtractorToCSV finish, send data to doneCh
  3. Send db data to data channel and after ExtractorToCSV finished to write to file, send true to doneCh.

I will update this post if you need more example.

Upvotes: 0

Related Questions