golang并发编程 - 例子解析
最近在看《Programming in Go》, 其中关于并发编程写得很不错, 受益非浅, 其中有一些例子是需要多思考才能想明白的, 所以我打算记录下来, 强化一下思路
《Programming in Go》在 Chapter 7. Concurrent Programming
里面一共用3个例子来讲述并发编程的3个模式, 第一个是 filter
, 筛选出后缀名和文件大小文件列表, 还算简单就不说, 然后第二个是升级版, 正则版 filter
, 不同的是他是根据正则搜索出文件的文本并且列出来. 这个例子我起初看是有点蒙的, 这样写是没错, 但是为什么要这样写, 他的设计思路是什么, 和其他方法相比他有什么优势, 这些都不清楚, 于是决定好好分析一下. 实际上这个例子实现的功能并不复杂, 所以我的文章实际上是在讨论怎么产生出和作者相似的思路.
如果不考虑用 goroutine 的话, 思路其实很简单:
1. 列出文件列表, 编译正则.
2. 遍历文件, 打开并遍历每行, 如果正则能匹配, 记录下来.
3. 列出来.
如果用 goroutine , 就会有以下思路:
1. 在得到文件路径数组之后, 分发任务给N个核.
2. 每个核负责打开文件, 将符合条件的那行文本写入到 `channel`
3. 主线程等待并接收`channel`的结果. 显示出来, 完毕
** 然后下文才是重点 **
在go中, channel 是不会自动关闭的, 所以需要在我们使用完之后手动去关闭, 而且如果使用for语法来遍历channel每次得到的数据, 如果channel没有关闭的话会陷入死循环. 在 goruntine 中会造成 deadlock
for job := range jobs {
fmt.Println(job)
}
如果没close, 会触发dead lock. 因为for...range...会自动阻塞直到读取到数据或者channel关闭, 没close的话就会导致整个channel处于睡眠状态. channel关闭后, 就不允许写入(缓冲的数据还在, 还可以读取), 所以, channel 关闭的时机很重要.
我所知道任务分发方法有两种:
第一种是固定分配, 如果说我想计算1+2+3+...+100, 然后分成4份, 也就是
1+2+..+25
,...
,...
,86+87+...+100
, 然后再将结果累加起来.
还有一种是抢占式的, 这里需要使用一个队列, 将所有任务写入队列, 然后开N个goroutine, 每个goroutine从队列读取任务(要确保线程安全), 处理, 完成后再继续读取任务. 不再是固定分配, 自己那份做完了就休息了, 所以看来第二种要好一点.
采用第二种方式的话, 对应go的做法, 那就是使用一个channel, 命名为 jobs
, 将所有的任务写入进去, 写入完毕之后关闭这个 channel, 当然, 因为是N核, 系统能同时处理的任务我们设置为N个(也就是我们使用了N个goruntine), 那么声明 jobs
是缓冲区长度为N的 channel.
Buffered channel
和普通的 channel 的差别是他可以同时容纳多个单位数据, 当缓存的数据单位数量等于 channel 容量的时候, 再执行写入将会阻塞, 否则都是及时处理的.
当我们将数据处理后, 就需要将结果收集起来. 需要注意的是, 这些操作不是在主 goruntine 执行, 所以我们需要通过 channel 传递给主 goruntine . 所以只需要在外部声明一个名为 results
的 channel . 然后在主 goruntine 通过 for
来显示, 这时候就会发现一个问题, 这个 results
关闭的时机问题. 正确的关闭时机是写入所有的 Result
之后. 但是别忘了我们同时开了多个 goruntine , 所以 results
应该在 执行任务的 goruntine 完成信号累计到N个
这个时机关闭. 所以我们再引入一个名叫 done
的 channel 来解决. 每个 goruntine 发送完 result 后会写入一次done, 然后我们就可以遍历 done , 遍历之后说明全部完成了, 再执行显示.
Result 的数据结构
type Result struct {
filename string
lino int
line string
}
书中的 cgrep1
就是这样的
func awaitCompletion(done <-chan struct{}, results chan Result) {
for i := 0; i < workers; i++ {
<-done
}
close(results)
}
但是这样有可能造成死锁, 因为书中 results
缓冲区长度限定为最大1000个, 也就是超过1000个 result 的时候再打算写入 result 会等待取出 result 后才执行, done 也不会写入, 而 awaitCompletion
是等到所有 goruntine 都完成了才会取出 results
, 而且当 result
非常大的时候因为内存的缘故也是不可能一次性取出的. 所以就需要在读取 results
的同时读取 done
, 当读取 done
次数大于 N 后关闭 results
, 所以, 因为要在多个 channel 中同时读取, 所以需要使用 select
.
下面是书中的 cgrep3
, 改进版:
func waitAndProcessResults(timeout int64, done <-chan struct{}, results <-chan Result) {
finish := time.After(time.Duration(timeout))
for working := workers; working > 0; {
select { // Blocking
case result := <-results:
fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
result.line)
case <-finish:
fmt.Println("timed out")
return // Time's up so finish with what results there were
case <-done:
working--
}
}
for {
select { // Nonblocking
case result := <-results:
fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
result.line)
case <-finish:
fmt.Println("timed out")
return // Time's up so finish with what results there were
default:
return
}
}
}
看到这里, 我就有个疑问, 为什么在全部完成之后(done都接收到N个了), 还要再遍历出 results
, 直到读取不到才算读取完成呢(我反应一向比较慢_)? 于是我做了个实验, 去掉了后面再次循环的部分, 发现有时会遗漏掉数据(我用4个测试文件...), 证明这段代码是有用的!!!
我的想法是, 他是在处理完 result, 然后写入 results
, 写完了才发送 done
, 也就是在收到所有的 done 之后, 所有的数据应该是已经处理完成的. 为了验证这个想法, 我写了一下代码:
for working := workers; working > 0; {
select { // Blocking
case result := <-results:
// received result
case <-done:
working--
if working <= 0 {
println(len(results))
}
}
}
然后看到输出的数是大于0的, 也就是说在接收到全部 done 之后, results
还有数据在缓冲区中, 然后在看看发送 result
的代码, 突然就明白了
func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
for job := range jobs {
job.Do(lineRx)
}
done <- struct{}{}
}
我把写入和读取想当然认为一起发生了, 因为有缓冲区的缘故, doJobs在发送进 results
的缓冲区之后就立刻发送 done
了, 但是写入的数据有没有被处理, 是不知道的, 所以在接收到所有 done
之后, results
缓冲区还有数据, 需要再循环一遍.
附我的代码一份:
package main
import (
"fmt"
"os"
"runtime"
"regexp"
"log"
"bufio"
)
type Job struct {
filename string
results chan<- Result
}
type Result struct {
filename string
line string
lino int
}
var worker = runtime.NumCPU()
func main() {
// config cpu number
runtime.GOMAXPROCS(worker)
files := os.Args[2:]
regex, err := regexp.Compile(os.Args[1])
if err != nil {
log.Fatal(err)
return
}
// 任务列表, 并发数目为CPU个数
jobs := make(chan Job, worker)
// 结果
results := make(chan Result, minimum(1000, len(files)))
// 标记完成
dones := make(chan struct{}, worker)
go addJob(files, jobs, results)
for i:=0; i<worker; i++ {
go doJob(jobs, regex, dones)
}
awaitForCloseResult(dones, results)
}
func addJob(files []string, jobs chan<- Job, results chan<- Result) {
for _, filename := range files {
jobs <- Job {filename, results}
}
close(jobs)
}
func doJob(jobs <-chan Job, regex *regexp.Regexp, dones chan<- struct{}) {
for job := range jobs {
job.Do(regex)
}
dones <- struct{}{}
}
func awaitForCloseResult(dones <-chan struct{}, results chan Result) {
working := worker
done := false
for {
select {
case result := <-results:
println(result)
case <-dones:
working -= 1
if working <= 0 {
done = true
}
default:
if done {
return
}
}
}
}
func (j *Job) Do(re *regexp.Regexp) {
f, err := os.Open(j.filename)
if err != nil {
println(err)
return
}
defer f.Close()
b := bufio.NewReader(f)
lino := 0
for {
line, _, err := b.ReadLine()
if re.Match(line) {
j.results <- Result {j.filename, string(line), lino}
}
if err != nil {
break
}
lino += 1
}
}
func minimum(a, b int) int {
if a > b {
return b
}
return a
}
func println(o ...interface{}) {
fmt.Println(o...)
}