raspi
raspi

Reputation: 6122

How to recursively list files with a channel in Go?

I'm trying to use channels to list directory trees recursively.

Currently I get a list of few files and then it gets stuck on one directory. Directory is sent to a worker but it doesn't process it.

How the directory should be sent inside the worker (if file.IsDir()) so that it gets properly processed and also notify the file lister that there are no new files to be processed after recursion is done?

Here's my current attempt:

package main

import (
    "fmt"
    "os"
    "path/filepath"
    "errors"
    "log"
)

// Job for worker
type workerJob struct {
    Root string
}

// Result of a worker
type workerResult struct {
    Filename string
}

func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
    for j := range jobs {
        log.Printf(`Directory: %#v`, j.Root)

        dir, err := os.Open(j.Root)

        if err != nil {
            if os.IsPermission(err) {
                // Skip if there's no permission
                continue
            }
            continue
        }

        fInfo, err := dir.Readdir(-1)
        dir.Close()
        if err != nil {
            if os.IsPermission(err) {
                // Skip if there's no permission
                continue
            }
            continue
        }

        for _, file := range fInfo {
            fpath := filepath.Join(dir.Name(), file.Name())

            if file.Mode().IsRegular() {
                // is file
                fs := uint64(file.Size())
                if fs == 0 {
                    // Skip zero sized
                    continue
                }

                r := workerResult{
                    Filename: fpath,
                }

                log.Printf(`sent result: %#v`, r.Filename)
                results <- r
            } else if file.IsDir() {
                // Send directory to be processed by the worker
                nj := workerJob{
                    Root: fpath,
                }
                log.Printf(`sent new dir job: %#v`, nj.Root)
                jobs <- nj
            }
        }

        done <- true
    }
}

func main() {
    dir := `/tmp`

    workerCount := 1

    jobs := make(chan workerJob, workerCount)
    results := make(chan workerResult)
    readDone := make(chan bool)

    // start N workers
    for i := 0; i < workerCount; i++ {
        go worker(jobs, results, readDone)
    }

    jobs <- workerJob{
        Root: dir,
    }

    readloop:
    for {
        select {
        case res := <-results:
            log.Printf(`result=%#v`, res.Filename)
        case _ = <-readDone:
            log.Printf(`got stop`)
            break readloop
        }
    }

}

This results in:

2018/07/12 14:37:29 Directory: "/tmp"
2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
2018/07/12 14:37:29 result="/tmp/.bashrc"
2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
2018/07/12 14:37:29 result="/tmp/.bash_profile"
2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
2018/07/12 14:37:29 result="/tmp/.bash_logout"
2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
2018/07/12 14:37:29 result="/tmp/.xinitrc"
2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
main.main()
    +0x281

goroutine 5 [chan send]:
main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
    +0x4e7
created by main.main
    +0x109

Process finished with exit code 2

How the deadlock can be fixed?

Upvotes: 2

Views: 1255

Answers (2)

leaf bebop
leaf bebop

Reputation: 8222

You have noticed that jobs <- nj hangs forever. This is because that the operation blocks until a worker receives in the range loop, and as long as it blocks there, it cannot reach the range loop.

To solve the problem you spawn a new goroutine to do that.

go func() {
        jobs <- nj
}()

And there is one more problem: your readDone channel.

Currently that channel will be emitted every time your worker finishes a job, and that leads to the possibility (select picks ready channel randomly) that select in func main() picks it up and then shutdown the system, which makes all remaining job and result lost.

To solve this part of the problem, you should use a sync.WaitGroup. Everytime you adds a new job, you call wg.Add(1) and everytime your worker finishes a job, you call wg.Done(). In func main(), you shall spawn a goroutine that use wg.Wait() to wait all jobs to finish and then shutdown the system using readDone.

// One initial job
wg.Add(1)
go func() {
    jobs <- workerJob{
        Root: dir,
    }
}()

// When all jobs finished, shutdown the system.
go func() {
    wg.Wait()
    readDone <- true
}()

Full code: https://play.golang.org/p/KzVxtflu1eU

Upvotes: 4

Nick Hunter
Nick Hunter

Reputation: 74

Initial comments on improving the code

Tim's comment doesn't seem to touch the essentials. It shouldn't matter that you close the channels at the end of main(), and neither should it matter that your select statement has a default case. If there's a message on the channel, the channel-reading case will run.

It could be considered a problem that while there's no message you'll be spinning the loop repeatedly through the default case which is going to cause a spike in CPU usage ("busy-waiting"), so yeah, probably just remove the default case.

You might also add a case for a "stop" channel which breaks the for loop, using a label (this is needed since otherwise break just breaks from the select statement and we loop again):

readloop:
for {
    select {
    case res := <-results:
        log.Printf(`result=%#v`, res.Filename)
    case _ = <-stopChan:
        break readloop
}

Finally, you should also probably rename the variable f in worker() to dir, since it's a directory and not a file. Just makes the code easier to read. Code should read almost like natural language to a programmer fluent in the language. That way, this statement,

fpath := filepath.Join(f.Name(), file.Name())

becomes

fpath := filepath.Join(dir.Name(), file.Name())

... which is a lot easier for your eyes/brain to scan.

Why is your code broken

You have a channel deadlock. You didn't notice because of the default case meaning that technically, one goroutine can always make "progress". Otherwise the runtime would've thrown panic saying:

fatal error: all goroutines are asleep - deadlock!

This is because worker() has the following structure:

receive from channel
...
    ...
    foreach dir in root:
        send to channel
    ...
...

But on a normal channel, both send and receive are blocking operations. The goroutine that sends/receives won't make progress until its partner shows up.

You could use a buffered channel to avoid this, but it's impossible to know in advance how many directories will be found in the directory, hence the buffer may be too small. I suggest spawning a goroutine, so that it can block without affecting the overall worker() loop:

go func() {
    for _, file := range fInfo {
        ...
    }
}()

Upvotes: 1

Related Questions