如何在Go中递归列出带有频道的文件?

问题描述 投票:2回答:2

我正在尝试使用通道递归列出目录树。

目前我得到一些文件列表然后它被卡在一个目录上。目录将发送给工作人员,但不会处理它。

如何在worker(if file.IsDir())中发送目录以便正确处理它并且还通知文件lister在递归完成后没有新文件要处理?

这是我目前的尝试:

package main

import (
    "fmt"
    "os"
    "path/filepath"
    "errors"
    "log"
)

// Job for worker
type workerJob struct {
    Root string
}

// Result of a worker
type workerResult struct {
    Filename string
}

func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
    for j := range jobs {
        log.Printf(`Directory: %#v`, j.Root)

        dir, err := os.Open(j.Root)

        if err != nil {
            if os.IsPermission(err) {
                // Skip if there's no permission
                continue
            }
            continue
        }

        fInfo, err := dir.Readdir(-1)
        dir.Close()
        if err != nil {
            if os.IsPermission(err) {
                // Skip if there's no permission
                continue
            }
            continue
        }

        for _, file := range fInfo {
            fpath := filepath.Join(dir.Name(), file.Name())

            if file.Mode().IsRegular() {
                // is file
                fs := uint64(file.Size())
                if fs == 0 {
                    // Skip zero sized
                    continue
                }

                r := workerResult{
                    Filename: fpath,
                }

                log.Printf(`sent result: %#v`, r.Filename)
                results <- r
            } else if file.IsDir() {
                // Send directory to be processed by the worker
                nj := workerJob{
                    Root: fpath,
                }
                log.Printf(`sent new dir job: %#v`, nj.Root)
                jobs <- nj
            }
        }

        done <- true
    }
}

func main() {
    dir := `/tmp`

    workerCount := 1

    jobs := make(chan workerJob, workerCount)
    results := make(chan workerResult)
    readDone := make(chan bool)

    // start N workers
    for i := 0; i < workerCount; i++ {
        go worker(jobs, results, readDone)
    }

    jobs <- workerJob{
        Root: dir,
    }

    readloop:
    for {
        select {
        case res := <-results:
            log.Printf(`result=%#v`, res.Filename)
        case _ = <-readDone:
            log.Printf(`got stop`)
            break readloop
        }
    }

}

这导致:

2018/07/12 14:37:29 Directory: "/tmp"
2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
2018/07/12 14:37:29 result="/tmp/.bashrc"
2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
2018/07/12 14:37:29 result="/tmp/.bash_profile"
2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
2018/07/12 14:37:29 result="/tmp/.bash_logout"
2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
2018/07/12 14:37:29 result="/tmp/.xinitrc"
2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
main.main()
    +0x281

goroutine 5 [chan send]:
main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
    +0x4e7
created by main.main
    +0x109

Process finished with exit code 2

如何修复僵局?

go recursion concurrency channel
2个回答
2
投票

你注意到jobs <- nj永远挂了。这是因为操作阻塞直到一个工人在range循环中接收,并且只要它在那里阻塞,它就不能到达range循环。

为了解决这个问题,你会产生一个新的goroutine来做到这一点。

go func() {
        jobs <- nj
}()

还有一个问题:你的readDone频道。

目前,每当你的worker完成一项工作时,这个频道就会被发出,这导致select随机选择准备频道的可能性select中的func main()选择它然后关闭系统,这使得所有剩余的工作和结果都丢失了。

要解决这部分问题,你应该使用sync.WaitGroup。每次添加新工作时,都会调用wg.Add(1),每次工作完成工作时,都会调用wg.Done()。在func main()中,你将产生一个使用wg.Wait()等待所有工作完成然后使用readDone关闭系统的goroutine。

// One initial job
wg.Add(1)
go func() {
    jobs <- workerJob{
        Root: dir,
    }
}()

// When all jobs finished, shutdown the system.
go func() {
    wg.Wait()
    readDone <- true
}()

完整代码:https://play.golang.org/p/KzVxtflu1eU


1
投票

关于改进代码的初步评论

蒂姆的评论似乎没有触及要领。在main()结束时关闭通道并不重要,并且你的select声明有default情况也不重要。如果频道上有消息,则频道读取案例将运行。

这可能被认为是一个问题,虽然没有消息,你将通过default案例反复旋转循环,这将导致CPU使用率激增(“忙等待”),所以是的,可能只是删除默认情况。

您还可以添加一个“停止”通道的情况,该通道使用标签打破for循环(这是必需的,否则break只是从select语句中断而我们再次循环):

readloop:
for {
    select {
    case res := <-results:
        log.Printf(`result=%#v`, res.Filename)
    case _ = <-stopChan:
        break readloop
}

最后,您还应该将f中的变量worker()重命名为dir,因为它是一个目录而不是文件。只是让代码更容易阅读。对于熟练使用该语言的程序员来说,代码应该像自然语言一样阅读。就这样,这句话,

fpath := filepath.Join(f.Name(), file.Name())

fpath := filepath.Join(dir.Name(), file.Name())

......你的眼睛/大脑更容易扫描。

为什么你的代码被破坏了

你有一个频道死锁。您没有注意到因为default案件意味着从技术上讲,一个goroutine总是能够“进步”。否则运行时会引起恐慌:

fatal error: all goroutines are asleep - deadlock!

这是因为worker()具有以下结构:

receive from channel
...
    ...
    foreach dir in root:
        send to channel
    ...
...

但在普通频道上,发送和接收都是阻塞操作。发送/接收的goroutine在其合作伙伴出现之前不会取得进展。

您可以使用缓冲通道来避免这种情况,但事先无法知道目录中将找到多少目录,因此缓冲区可能太小。我建议产生一个goroutine,以便它可以阻止而不影响整个worker()循环:

go func() {
    for _, file := range fInfo {
        ...
    }
}()
© www.soinside.com 2019 - 2024. All rights reserved.