WoooHaaaa
WoooHaaaa

Reputation: 20450

Golang goroutine doesn't run with channel inside

I'm trying to implement a word count program, but with the first step i got some problem.

Here's my code:

package main

import (
    "fmt"
    "os"
    "bufio"
    "sync"
)

// Load data into channel
func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
    for _,path := range arr {
        file,err := os.Open(path)
        fmt.Println("begin to laodData ", path)
        if err != nil {
            fmt.Println(err)
            os.Exit(-1)
        }
        defer file.Close()
        reader := bufio.NewReaderSize(file, 32*10*1024)
        i := 0
        for {
            line,err := reader.ReadString('\n')
            channel <- line
            if err != nil {
                break
            }
            i++
            if i%200 == 0 {
                fmt.Println(i," lines parsed")
            }
        }
        fmt.Println("finish laodData ", path)
    }
    wg.Done()
}

// dispatch data lines into different mappers
func dispatcher(channel chan string,wg sync.WaitGroup){
    fmt.Println("pull data 11")
    line,ok := <- channel
    fmt.Println(ok)
    for ok {
        fmt.Println(line)
        line,ok = <- channel
    }
    fmt.Println("pull data 22")
    wg.Done()
}

func main() {
    path := os.Args
    if len(path) < 2 {
        fmt.Println("Need Input Files")
        os.Exit(0)
    }
    var wg sync.WaitGroup
    wg.Add(2)

    channel := make(chan string)
    defer close(channel)

    fmt.Println("before dispatcher")
    go laodData(path[1:],channel,wg)
    go dispatcher(channel,wg)
    wg.Wait()

    fmt.Println("after dispatcher")
}

And here's my output:

...

finish laodData  result.txt

throw: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x42154100, 0x42154100)
    /usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
sync.(*WaitGroup).Wait(0x4213b440, 0x0)
    /usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
main.main()
    /Users/kuankuan/go/src/mreasy/main.go:66 +0x238

goroutine 2 [syscall]:
created by runtime.main
    /usr/local/go/src/pkg/runtime/proc.c:221

goroutine 4 [chan receive]:
main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
    /Users/kuankuan/go/src/mreasy/main.go:45 +0x223
created by main.main
    /Users/kuankuan/go/src/mreasy/main.go:65 +0x228
exit status 2

Thanks !

Upvotes: 6

Views: 2560

Answers (3)

Siva Subramanian
Siva Subramanian

Reputation: 11

Two issues needs to be fixed in the original question.

  1. You have to close the channel once you're done sending all the data. In func laodData, please use close(channel) post sending all data.
  2. Pass the sync.Waitgroup as a reference.you are sending wg as a value in the argument to the following functions... laodData and dispatcher functions.

Fixing these two issues will fix your problem of deadlock. The reasons for the deadlock in your code follow:

  • Leaving the sending channel unclosed will cause the downstream channel to wait for prolonged time.
  • sending the argument of sync.Waitgroup as a value . It should be sent as a reference otherwise it will create a new copy of the object which you are sending.

Upvotes: 1

Thomas Kappler
Thomas Kappler

Reputation: 4115

I modified your example to run on the Go playground where there's no file I/O; it sends random numbers on the channel instead.

@Victor Deryagin's explanation and his suggestion of using a "done" channel is correct. The reason you get a deadlock is that your goroutine sends on channel, but no one reads from it, so the program is stuck at this point. In the above link I added a consumer goroutine. The program then runs concurrently as intended.

Note that to wait for several goroutines, it's clearer and easier to use sync.WaitGroup.

Upvotes: 2

Victor Deryagin
Victor Deryagin

Reputation: 12215

Program terminates when main goroutine exits, so that dispatcher() has no time to do anything. You need to block in main() until dispatcher() completes. Channel can be used for this:

package main

import (
    "fmt"
    "os"
    "bufio"
)

var done = make(chan bool)             // create channel

// Load files and send them into a channel for mappers reading.
func dispatcher(arr []string,channel chan string) {
    for _,path := range arr {
        file,err := os.Open(path)
        fmt.Println("begin to dispatch ", path)
        if err != nil {
            fmt.Println(err)
            os.Exit(-1)
        }
        defer file.Close()
        reader := bufio.NewReaderSize(file, 32*10*1024)
        i := 0
        for {
            line,_ := reader.ReadString('\n')
            channel <- line
            i++
            if i%200 == 0 {
                fmt.Println(i," lines parsed")
            }
        }
        fmt.Println("finish dispatch ", path)
    }
    done <- true                 // notify main() of completion
}

func main() {
    path := os.Args
    if len(path) < 2 {
        fmt.Println("Need Input Files")
        os.Exit(0)
    }
    channel := make(chan string)
    fmt.Println("before dispatcher")
    go dispatcher(path[1:],channel)
    <-done                 // wait for dispatcher()
    fmt.Println("after dispatcher")
}

Upvotes: 9

Related Questions