Reputation: 790
I'm extracting from a long redshift table all its data in chunks, each chunk to a csv file. I want to control how many files are created at the "same" time (concurrently), i.e. if the whole process will create 10 files, I want to, let's say, create 4 files, wait until they are created and once they are "done", create another 4, and then the remaining 2.
How can I achieve this using channel/s?
I've tried to change the following slice to a channel, but I couldn't get it to work as I said, the implementation I did, did not wait/stop for the 4 first files to end before creating the following ones.
Right now I'm doing the following using WaitGroup:
package, imports, var, etc...
//Inside func main:
//Create a WaitGroup
var wg = sync.WaitGroup{}
//Opening the connection
db, err := sql.Open("postgres", connStr)
if err != nil {
panic(err.Error())
}
defer db.Close()
//Define chunks using a slice
chunkSizer := Slicer(totalRowsInTable, numberRowsByChunk) // e.g. []int{100, 100, 100... 100}
//Iterating over the array
for index, value := range chunkSizer {
wg.Add(1)
go ExtractorToCSV(db, queriedSection, expFileName)
if (index+1)% 4 == 0 { // <-- 4 is the maximum number of files created at the "same" time
wg.Wait()
}
wg.Wait() // <-- waits for the remaining files (last 2 in this case)
}
//Outside main
func ExtractorToCSV(db *sql.DB, queryToExtract, fileName string) {
//... do its process
wg.Done()
}
I've tried using a buffered channel of the size that I wanted to stop (4 in this case), but I didn't use it properly, I don't know...
Thanks in advance.
Upvotes: 0
Views: 118
Reputation: 828
UPDATED - STOP CONDITION
You can use channel to hold the next line of code like this. This is minimum code that I write for you. Tweak it as you like
var doneCh = make(chan bool)
func main() {
WRITE_POOL := 4
for index, val := range RANGE {
go extractToFile(val)
if (index + 1) % WRITE_POOL == 0 {
// wait for doneCh to finish
// if the iteration is divisive of WRITE_POOL
<-doneCh
<-doneCh
<-doneCh
<-doneCh
} else if index == MAX - 1 {
// wait for whatever doneCh left to finish
// if current val is the last one
LEFT := MAX - index - 1
for i := 0; i < LEFT; i++ {
<-doneCh
}
}
}
}
func extractToFile(val int) {
os.Create(fmt.Sprintf("test-%d", val))
doneCh<-true
}
For better performance, try to :
db
data to data channel and after ExtractorToCSV finished to write to file, send true to doneCh.I will update this post if you need more example.
Upvotes: 0