Reputation: 902
Trying to mock a cdn server worker using go channels and worker group patterns.
func main() {
var wg sync.WaitGroup
fileJobs := make(chan string)
compress := make(chan CompressionStatus)
upload := make(chan UploadResult)
// get list of files to distribute from a text file. This is usually 1-1.5 Gb, just filenames
go listFiles("/Users/rverma/go/src/github.com/dsa/gopher/images.txt", fileJobs)
go listFiles("/Users/rverma/go/src/github.com/dsa/gopher/videos.txt", fileJobs)
go compressFile(fileJobs, compress)
go compressFile(fileJobs, compress)
go compressFile(fileJobs, compress)
go compressFile(fileJobs, compress)
go compressFile(fileJobs, compress)
go compressFile(fileJobs, compress)
wg.Add(3)
go uploadToAll(compress, upload, &wg)
go uploadToAll(compress, upload, &wg)
go uploadToAll(compress, upload, &wg)
wg.Wait()
close(fileJobs)
close(compress)
close(upload)
}
func listFiles(filename string, c chan<- string) {
file, err := os.OpenFile(filename, os.O_RDONLY, os.ModePerm)
if err != nil {
panic("file not found")
}
defer file.Close()
r := bufio.NewScanner(file)
for r.Scan() {
c <- r.Text()
}
}
type CompressionStatus struct {
file string
compressed string
status bool
}
func compressFile(fileJob <-chan string, out chan<- CompressionStatus) {
for fileName := range fileJob {
fmt.Printf("compressing %s\n", fileName)
fib(25) // calculate fibonnaci number, keep cpu busy
fmt.Printf("compressed %s\n", fileName)
out <- CompressionStatus{
file: fileName,
compressed: fileName + ".compressed",
status: true,
}
}
}
func uploadToAll(comressedFile <-chan CompressionStatus, result chan<- UploadResult, wg *sync.WaitGroup) {
for fileName := range comressedFile {
go func() {
result <- US(fileName.compressed)
}()
go func() {
result <- IND(fileName.compressed)
}()
}
wg.Done()
}
type UploadResult string
type UploadServer func(region string) UploadResult
var (
US = fileUploader("US")
IND = fileUploader("IND")
)
// can't change construct
func fileUploader(region string) UploadServer {
sleep := time.Millisecond * 500
if region == "IND" {
sleep *= 4
}
return func(fileName string) UploadResult {
fmt.Printf("uploading %s to server %s\n", fileName, region)
time.Sleep(sleep)
fmt.Printf("upload %s completed to server %s\n", fileName, region)
return UploadResult(region)
}
}
Although the code works but when it finishes, it shows exceptions like. This seems to be an exception when we close the function which is receiving from the channel, while the loop which sending still active.
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc00009a004)
/usr/local/Cellar/go/1.13.3/libexec/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00009a004)
/usr/local/Cellar/go/1.13.3/libexec/src/sync/waitgroup.go:130 +0x64
main.main()
/**/file_compressor.go:34 +0x2e1
goroutine 20 [chan receive]:
main.compressFile(0xc000074060, 0xc0000740c0)
/**/file_compressor.go:75 +0x230
created by main.main
/**/file_compressor.go:22 +0x13e
Bit confused what is the best way to address this. Also this is suppose to run on 8 core machine, i have added 6 go worker for compression as its cpu intensive process. Wondering if we can optimize this or make code bit cleaner.
Upvotes: 0
Views: 314
Reputation: 51512
compressFile
is waiting to read from fileJob
, but there are no goroutines left that write to it. uploadToAll
is also waiting from compressedFile
, but that's not going to happen, so so wg.Done
will not be called. Because of that, main goroutine is also waiting on wg.Wait
. That means, all goroutines are waiting on something, and none of them are progressing, so deadlock.
One way to deal with this is to close the channel once you're done writing to it. That will terminate the for loops reading from the channel. Since you're writing to the same channel from multiple goroutines, you don't really know when you're done with it. You can add a new waitgroup for the listFiles
goroutines, and wait for those two to complete, and then close the channel. Same for compress
and uploadAll
groups. You can do something like this:
wgList.Add(2)
go func() {
wgList.Wait()
close(fileJobs)
}()
go listFiles(&wgList,...)
...
Don't forget this:
func listFiles(wgList *sync.WaitGroup,...) {
defer wgList.Done()
...
}
And similar, for the other groups...
Upvotes: 2