user17699298
user17699298

Reputation:

Code executes successfully even though there are issues with WaitGroup implementation

I have this snippet of code which concurrently runs a function using an input and output channel and associated WaitGroups, but I was clued in to the fact that I've done some things wrong. Here's the code:

func main() {
    concurrency := 50
    var tasksWG sync.WaitGroup
    tasks := make(chan string)
    output := make(chan string)

    for i := 0; i < concurrency; i++ {
        tasksWG.Add(1)

        // evidentally because I'm processing tasks in a groutine then I'm not blocking and I end up closing the tasks channel almost immediately and stopping tasks from executing
        go func() {
            for t := range tasks {
                output <- process(t)
                continue
            }
            tasksWG.Done()
        }()
    }

    var outputWG sync.WaitGroup
    outputWG.Add(1)
    go func() {
        for o := range output {
            fmt.Println(o)
        }
        outputWG.Done()
    }()

    go func() {
        // because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
        tasksWG.Wait()
        close(output)
    }()

    f, err := os.Open(os.Args[1])
    if err != nil {
        log.Panic(err)
    }

    s := bufio.NewScanner(f)
    for s.Scan() {
        tasks <- s.Text()
    }

    close(tasks)
    // and finally the output wait group finishes almost immediately as well because tasks gets closed right away due to my improper use of goroutines
    outputWG.Wait()
}

func process(t string) string {
    time.Sleep(3 * time.Second)
    return t
}

I've indicated in the comments where I've implementing things wrong. Now these comments make sense to me. The funny thing is that this code does indeed seem to run asynchronously and dramatically speeds up execution. I want to understand what I've done wrong but it's hard to wrap my head around it when the code seems to execute in an asynchronous way. I'd love to understand this better.

Upvotes: 1

Views: 78

Answers (1)

Your main goroutine is doing a couple of things sequentially and others concurrently, so I think your order of execution is off

    f, err := os.Open(os.Args[1])
    if err != nil {
        log.Panic(err)
    }

    s := bufio.NewScanner(f)
    for s.Scan() {
        tasks <- s.Text()
    }

Shouldn't you move this up top? So then you have values sent to tasks

THEN have your loop which ranges over tasks 50 times in the concurrency named for loop (you want to have something in tasks before calling code that ranges over it)


go func() {
    // because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
    tasksWG.Wait()
    close(output)
}()

The logic here is confusing me, you're spawning a goroutine to wait on the waitgroup, so here the wait is nonblocking on the main goroutine - is that what you want to do? It won't wait for tasksWG to be decremented to zero inside main, it'll do that inside the goroutine that you've created. I don't believe you want to do that?


It might be easier to debug if you could give more details on the expected output?

Upvotes: 1

Related Questions