问题 在Golang中同时读取文件


读取部分不是并发的,而是处理。我用这种方式表达了标题,因为我最有可能使用该短语再次搜索此问题。 :)

我试图陷入僵局 超越例子 所以这对我来说是一次学习经历。我的目标是这些:

  1. 逐行读取文件(最终使用缓冲区来完成行组)。
  2. 将文本传递给 func() 做一些正则表达式的工作。
  3. 将结果发送到某处但避免使用互斥锁或共享变量。我正在向通道发送整数(总是数字1)。这有点愚蠢,但如果它没有引起问题,我想把它留下来,除非你们有一个更整洁的选择。
  4. 使用工作池来执行此操作。我不确定如何告诉工人们自己重新排队?

这里是 游乐场链接。我试着写有用的评论,希望这是有道理的。我的设计可能完全错误,所以不要犹豫重构。

package main

import (
  "bufio"
  "fmt"
  "regexp"
  "strings"
  "sync"
)

func telephoneNumbersInFile(path string) int {
  file := strings.NewReader(path)

  var telephone = regexp.MustCompile(`\(\d+\)\s\d+-\d+`)

  // do I need buffered channels here?
  jobs := make(chan string)
  results := make(chan int)

  // I think we need a wait group, not sure.
  wg := new(sync.WaitGroup)

  // start up some workers that will block and wait?
  for w := 1; w <= 3; w++ {
    wg.Add(1)
    go matchTelephoneNumbers(jobs, results, wg, telephone)
  }

  // go over a file line by line and queue up a ton of work
  scanner := bufio.NewScanner(file)
  for scanner.Scan() {
    // Later I want to create a buffer of lines, not just line-by-line here ...
    jobs <- scanner.Text()
  }

  close(jobs)
  wg.Wait()

  // Add up the results from the results channel.
  // The rest of this isn't even working so ignore for now.
  counts := 0
  // for v := range results {
  //   counts += v
  // }

  return counts
}

func matchTelephoneNumbers(jobs <-chan string, results chan<- int, wg *sync.WaitGroup, telephone *regexp.Regexp) {
  // Decreasing internal counter for wait-group as soon as goroutine finishes
  defer wg.Done()

  // eventually I want to have a []string channel to work on a chunk of lines not just one line of text
  for j := range jobs {
    if telephone.MatchString(j) {
      results <- 1
    }
  }
}

func main() {
  // An artificial input source.  Normally this is a file passed on the command line.
  const input = "Foo\n(555) 123-3456\nBar\nBaz"
  numberOfTelephoneNumbers := telephoneNumbersInFile(input)
  fmt.Println(numberOfTelephoneNumbers)
}

4543
2017-11-30 19:46


起源



答案:


你几乎就在那里,只需要一些关于goroutines同步的工作。您的问题是您正在尝试提供解析器并在同一例程中收集结果,但这是无法完成的。

我建议如下:

  1. 在单独的例程中运行扫描程序,一旦读取所有内容,关闭输入通道。
  2. 运行单独的例程,等待解析器完成其工作,而不是关闭输出通道。
  3. 收集主程序中的所有结果。

相关更改可能如下所示:

// Go over a file line by line and queue up a ton of work
go func() {
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        jobs <- scanner.Text()
    }
    close(jobs)
}()

// Collect all the results...
// First, make sure we close the result channel when everything was processed
go func() {
    wg.Wait()
    close(results)
}()

// Now, add up the results from the results channel until closed
counts := 0
for v := range results {
    counts += v
}

在操场上完全运作的例子: http://play.golang.org/p/coja1_w-fY

值得补充你不一定需要 WaitGroup 要实现同样的目标,您需要知道的是何时停止接收结果。这可以通过扫描仪广告(在频道上)读取多少行然后收集器只读取指定数量的结果(尽管你也需要发送零)来实现。


12
2017-11-30 20:37



惊人。是的,这适用于100mb的文本文件。谢谢!那么,当主要计算结果时,关闭(结果)恰好不会触发吗?我将不得不放入一些打印件来查看它在运行时是如何工作的。 - squarism
我们产生了一个单独的例程,等待着 WaitGroup 完成然后关闭结果通道。这是在背景中发生的(我们使用过 go)。请注意,我们立即开始收集结果(而 我们读的文件不是 后 阅读它)。您可以在调用后执行此操作,而不是生成用于关闭结果通道的单独例程 close(jobs) 同样 - 换句话说:读取所有行,等待解析器完成,关闭结果通道。有很多可能性。 - tomasz


答案:


你几乎就在那里,只需要一些关于goroutines同步的工作。您的问题是您正在尝试提供解析器并在同一例程中收集结果,但这是无法完成的。

我建议如下:

  1. 在单独的例程中运行扫描程序,一旦读取所有内容,关闭输入通道。
  2. 运行单独的例程,等待解析器完成其工作,而不是关闭输出通道。
  3. 收集主程序中的所有结果。

相关更改可能如下所示:

// Go over a file line by line and queue up a ton of work
go func() {
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        jobs <- scanner.Text()
    }
    close(jobs)
}()

// Collect all the results...
// First, make sure we close the result channel when everything was processed
go func() {
    wg.Wait()
    close(results)
}()

// Now, add up the results from the results channel until closed
counts := 0
for v := range results {
    counts += v
}

在操场上完全运作的例子: http://play.golang.org/p/coja1_w-fY

值得补充你不一定需要 WaitGroup 要实现同样的目标,您需要知道的是何时停止接收结果。这可以通过扫描仪广告(在频道上)读取多少行然后收集器只读取指定数量的结果(尽管你也需要发送零)来实现。


12
2017-11-30 20:37



惊人。是的,这适用于100mb的文本文件。谢谢!那么,当主要计算结果时,关闭(结果)恰好不会触发吗?我将不得不放入一些打印件来查看它在运行时是如何工作的。 - squarism
我们产生了一个单独的例程,等待着 WaitGroup 完成然后关闭结果通道。这是在背景中发生的(我们使用过 go)。请注意,我们立即开始收集结果(而 我们读的文件不是 后 阅读它)。您可以在调用后执行此操作,而不是生成用于关闭结果通道的单独例程 close(jobs) 同样 - 换句话说:读取所有行,等待解析器完成,关闭结果通道。有很多可能性。 - tomasz


编辑: @tomasz上面的答案是正确的。请忽略这个答案。

你需要做两件事:

  1. 使用缓冲的chan,以便发送不会阻止
  2. 关闭结果陈,以便接收不会阻止。

使用缓冲通道是必不可少的,因为无缓冲通道需要为每次发送接收,这会导致您遇到的死锁。

如果您解决了这个问题,当您尝试接收结果时,您将遇到死锁,因为结果尚未关闭。

这是固定的游乐场: http://play.golang.org/p/DtS8Matgi5


1
2017-11-30 20:07



哎呀。我太近了。我想我尝试了这些东西,但不是两个。显然,我不喜欢这个。谢谢!我会研究这个。 - squarism
Go并不容易,并发也不容易:-)需要时间。 - jjm
我担心这是不正确的。如果您在示例中添加了超过10行的电话号码,那么您将陷入僵局。您不需要缓冲通道,您需要在等待组完成时继续收集结果(然后例如关闭 result 渠道)。 - tomasz
@tomasz这是一个更好的解决方案 - jjm
别 使用缓冲来解决僵局 - 这太冒险了。事实上,我的经验法则是从一开始 零 缓冲,以便最有可能发生死锁。然后我想出如何安排goroutines和渠道不要死锁。然后,如果我需要作为优化步骤,我可以添加缓冲。 - Rick-777