Reputation:
I have this snippet of code which concurrently runs a function using an input and output channel and associated WaitGroups, but I was clued in to the fact that I've done some things wrong. Here's the code:
func main() {
concurrency := 50
var tasksWG sync.WaitGroup
tasks := make(chan string)
output := make(chan string)
for i := 0; i < concurrency; i++ {
tasksWG.Add(1)
// evidentally because I'm processing tasks in a groutine then I'm not blocking and I end up closing the tasks channel almost immediately and stopping tasks from executing
go func() {
for t := range tasks {
output <- process(t)
continue
}
tasksWG.Done()
}()
}
var outputWG sync.WaitGroup
outputWG.Add(1)
go func() {
for o := range output {
fmt.Println(o)
}
outputWG.Done()
}()
go func() {
// because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
tasksWG.Wait()
close(output)
}()
f, err := os.Open(os.Args[1])
if err != nil {
log.Panic(err)
}
s := bufio.NewScanner(f)
for s.Scan() {
tasks <- s.Text()
}
close(tasks)
// and finally the output wait group finishes almost immediately as well because tasks gets closed right away due to my improper use of goroutines
outputWG.Wait()
}
func process(t string) string {
time.Sleep(3 * time.Second)
return t
}
I've indicated in the comments where I've implementing things wrong. Now these comments make sense to me. The funny thing is that this code does indeed seem to run asynchronously and dramatically speeds up execution. I want to understand what I've done wrong but it's hard to wrap my head around it when the code seems to execute in an asynchronous way. I'd love to understand this better.
Upvotes: 1
Views: 78
Reputation: 1311
Your main goroutine is doing a couple of things sequentially and others concurrently, so I think your order of execution is off
f, err := os.Open(os.Args[1])
if err != nil {
log.Panic(err)
}
s := bufio.NewScanner(f)
for s.Scan() {
tasks <- s.Text()
}
Shouldn't you move this up top? So then you have values sent to tasks
THEN have your loop which ranges over tasks 50 times in the concurrency named for loop (you want to have something in tasks before calling code that ranges over it)
go func() {
// because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
tasksWG.Wait()
close(output)
}()
The logic here is confusing me, you're spawning a goroutine to wait on the waitgroup, so here the wait is nonblocking on the main goroutine - is that what you want to do? It won't wait for tasksWG to be decremented to zero inside main, it'll do that inside the goroutine that you've created. I don't believe you want to do that?
It might be easier to debug if you could give more details on the expected output?
Upvotes: 1