06|WaitGroup:协同等待,任务编排利器
文章目录
你好,我是鸟窝。
WaitGroup,我们以前都多多少少学习过,或者是使用过。其实,WaitGroup 很简单,就是 package sync 用来做任务编排的一个并发原语。它要解决的就是并发 - 等待的问题:现在有一个 goroutine A 在检查点(checkpoint)等待一组 goroutine 全部完成,如果在执行任务的这些 goroutine 还没全部完成,那么 goroutine A 就会阻塞在检查点,直到所有 goroutine 都完成后才能继续执行。
我们来看一个使用 WaitGroup 的场景。
比如,我们要完成一个大的任务,需要使用并行的 goroutine 执行三个小任务,只有这三个小任务都完成,我们才能去执行后面的任务。如果通过轮询的方式定时询问三个小任务是否完成,会存在两个问题:一是,性能比较低,因为三个小任务可能早就完成了,却要等很长时间才被轮询到;二是,会有很多无谓的轮询,空耗 CPU 资源。
那么,这个时候使用 WaitGroup 并发原语就比较有效了,它可以阻塞等待的 goroutine。等到三个小任务都完成了,再即时唤醒它们。
其实,很多操作系统和编程语言都提供了类似的并发原语。比如,Linux 中的 barrier、Pthread(POSIX 线程)中的 barrier、C++ 中的 std::barrier、Java 中的 CyclicBarrier 和 CountDownLatch 等。由此可见,这个并发原语还是一个非常基础的并发类型。所以,我们要认真掌握今天的内容,这样就可以举一反三,轻松应对其他场景下的需求了。
我们还是从 WaitGroup 的基本用法学起吧。
WaitGroup 的基本用法
Go 标准库中的 WaitGroup 提供了三个方法,保持了 Go 简洁的风格。
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
我们分别看下这三个方法:
- Add,用来设置 WaitGroup 的计数值;
- Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
- Wait,调用这个方法的 goroutine 会一直阻塞,直到 WaitGroup 的计数值变为 0。
接下来,我们通过一个使用 WaitGroup 的例子,来看下 Add、Done、Wait 方法的基本用法。
在这个例子中,我们使用了以前实现的计数器 struct。我们启动了 10 个 worker,分别对计数值加一,10 个 worker 都完成后,我们期望输出计数器的值。
// 线程安全的计数器
type Counter struct {
mu sync.Mutex
count uint64
}
// 对计数值加一
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 获取当前的计数值
func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// sleep 1 秒,然后计数值加 1
func worker(c *Counter, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
c.Incr()
}
func main() {
var counter Counter
var wg sync.WaitGroup
wg.Add(10) // WaitGroup 的值设置为 10
for i := 0; i < 10; i++ { // 启动 10 个 goroutine 执行加 1 任务
go worker(&counter, &wg)
}
// 检查点,等待 goroutine 都完成任务
wg.Wait()
// 输出当前计数器的值
fmt.Println(counter.Count())
}
我们一起来分析下这段代码。
- 第 28 行,声明了一个 WaitGroup 变量,初始值为零。
- 第 29 行,把 WaitGroup 变量的计数值设置为 10。因为我们需要编排 10 个 goroutine(worker) 去执行任务,并且等待 goroutine 完成。
- 第 35 行,调用 Wait 方法阻塞等待。
- 第 32 行,启动了 goroutine,并把我们定义的 WaitGroup 指针当作参数传递进去。goroutine 完成后,需要调用 Done 方法,把 WaitGroup 的计数值减 1。等 10 个 goroutine 都调用了 Done 方法后,WaitGroup 的计数值降为 0,这时,第 35 行的主 goroutine 就不再阻塞,会继续执行,在第 37 行输出计数值。
这就是我们使用 WaitGroup 编排这类任务的常用方式。而“这类任务”指的就是,需要启动多个 goroutine 执行任务,主 goroutine 需要等待子 goroutine 都完成后才继续执行。
熟悉了 WaitGroup 的基本用法后,我们再看看它具体是如何实现的吧。
WaitGroup 的实现
首先,我们看看 WaitGroup 的数据结构。它包括了一个 noCopy 的辅助字段,一个 state1 记录 WaitGroup 状态的数组。
- noCopy 的辅助字段,主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段;
- state1,一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。
WaitGroup 的数据结构定义以及 state 信息的获取方法如下:
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉 vet 工具违反了复制使用的规则
noCopy noCopy
// 64bit(8bytes) 的值分成两段,高 32bit 是计数值,低 32bit 是 waiter 的计数
// 另外 32bit 是用作信号量的
// 因为 64bit 值的原子操作需要 64bit 对齐,但是 32bit 编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
// 总之,会找到对齐的那 64bit 作为 state,其余的 32bit 做信号量
state1 [3]uint32
}
// 得到 state 的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是 64bit 对齐的,数组前两个元素做 state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是 32bit 对齐的,数组后两个元素用来做 state,它可以用来做 64bit 的原子操作,第一个元素 32bit 用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。
在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。
在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。
然后,我们继续深入源码,看一下 Add、Done 和 Wait 这三个方法的实现。
在查看这部分源码实现时,我们会发现,除了这些方法本身的实现外,还会有一些额外的代码,主要是 race 检查和异常检查的代码。其中,有几个检查非常关键,如果检查不通过,会出现 panic,这部分内容我会在下一小节分析 WaitGroup 的错误使用场景时介绍。现在,我们先专注在 Add、Wait 和 Done 本身的实现代码上。
我先为你梳理下 Add 方法的逻辑。Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。
它的实现代码如下:
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高 32bit 是计数值 v,所以把 delta 左移 32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)«32)
v := int32(state » 32) // 当前计数值
w := uint32(state) // waiter count
if v > 0 || w == 0 {
return
}
// 如果计数值 v 为 0 并且 waiter 的数量 w 不为 0,那么 state 的值就是 waiter 的数量
// 将 waiter 的数量设置为 0,因为计数值 v 也是 0,所以它们俩的组合*statep 直接设置为 0 即可。此时需要并唤醒所有的 waiter
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
// Done 方法实际就是计数器减 1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。
其主干实现代码如下:
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter 的数量
if v == 0 {
// 如果计数值为 0, 调用这个方法的 goroutine 不必再等待,继续执行它后面的逻辑即可
return
}
// 否则把 waiter 数量加 1。期间可能有并发调用 Wait 的情况,所以最外层使用了一个 for 循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}
使用 WaitGroup 时的常见错误
在分析 WaitGroup 的 Add、Done 和 Wait 方法的实现的时候,为避免干扰,我删除了异常检查的代码。但是,这些异常检查非常有用。
我们在开发的时候,经常会遇见或看到误用 WaitGroup 的场景,究其原因就是没有弄明白这些检查的逻辑。所以接下来,我们就通过几个小例子,一起学习下在开发时绝对要避免的 3 个问题。
常见问题一:计数器设置为负值
WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。
一般情况下,有两种方法会导致计数器设置为负数。
第一种方法是:调用 Add 的时候传递一个负数。如果你能保证当前的计数器加上这个负数后还是大于等于 0 的话,也没有问题,否则就会导致 panic。
比如下面这段代码,计数器的初始值为 10,当第一次传入 -10 的时候,计数值被设置为 0,不会有啥问题。但是,再紧接着传入 -1 以后,计数值就被设置为负数了,程序就会出现 panic。
func main() {
var wg sync.WaitGroup
wg.Add(10)
wg.Add(-10)//将 -10 作为参数调用 Add,计数值被设置为 0
wg.Add(-1)//将 -1 作为参数调用 Add,如果加上 -1 计数值就会变为负数。这是不对的,所以会触发 panic
}
第二个方法是:调用 Done 方法的次数过多,超过了 WaitGroup 的计数值。
使用 WaitGroup 的正确姿势是,预先确定好 WaitGroup 的计数值,然后调用相同次数的 Done 完成相应的任务。比如,在 WaitGroup 变量声明之后,就立即设置它的计数值,或者在 goroutine 启动之前增加 1,然后在 goroutine 中调用 Done。
如果你没有遵循这些规则,就很可能会导致 Done 方法调用的次数和计数值不一致,进而造成死锁(Done 调用次数比计数值少)或者 panic(Done 调用次数比计数值多)。
比如下面这个例子中,多调用了一次 Done 方法后,会导致计数值为负,所以程序运行到这一行会出现 panic。
func main() {
var wg sync.WaitGroup
wg.Add(1)
wg.Done()
wg.Done()
}
常见问题二:不期望的 Add 时机
在使用 WaitGroup 的时候,你一定要遵循的原则就是,等所有的 Add 方法调用之后再调用 Wait,否则就可能导致 panic 或者不期望的结果。
我们构造这样一个场景:只有部分的 Add/Done 执行完后,Wait 就返回。我们看一个例子:启动四个 goroutine,每个 goroutine 内部调用 Add(1) 然后调用 Done(),主 goroutine 调用 Wait 等待任务完成。
func main() {
var wg sync.WaitGroup
go dosomething(100, &wg) // 启动第一个 goroutine
go dosomething(110, &wg) // 启动第二个 goroutine
go dosomething(120, &wg) // 启动第三个 goroutine
go dosomething(130, &wg) // 启动第四个 goroutine
wg.Wait() // 主 goroutine 等待完成
fmt.Println("Done")
}
func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration) // 故意 sleep 一段时间
wg.Add(1)
fmt.Println("后台执行,duration:", duration)
wg.Done()
}
在这个例子中,我们原本设想的是,等四个 goroutine 都执行完毕后输出 Done 的信息,但是它的错误之处在于,将 WaitGroup.Add 方法的调用放在了子 gorotuine 中。等主 goorutine 调用 Wait 的时候,因为四个任务 goroutine 一开始都休眠,所以可能 WaitGroup 的 Add 方法还没有被调用,WaitGroup 的计数还是 0,所以它并没有等待四个子 goroutine 执行完毕才继续执行,而是立刻执行了下一步。
导致这个错误的原因是,没有遵循先完成所有的 Add 之后才 Wait。要解决这个问题,一个方法是,预先设置计数值:
func main() {
var wg sync.WaitGroup
wg.Add(4) // 预先设定 WaitGroup 的计数值
go dosomething(100, &wg) // 启动第一个 goroutine
go dosomething(110, &wg) // 启动第二个 goroutine
go dosomething(120, &wg) // 启动第三个 goroutine
go dosomething(130, &wg) // 启动第四个 goroutine
wg.Wait() // 主 goroutine 等待
fmt.Println("Done")
}
func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration)
fmt.Println("后台执行,duration:", duration)
wg.Done()
}
另一种方法是在启动子 goroutine 之前才调用 Add:
func main() {
var wg sync.WaitGroup
dosomething(100, &wg) // 调用方法,把计数值加 1,并启动任务 goroutine
dosomething(110, &wg) // 调用方法,把计数值加 1,并启动任务 goroutine
dosomething(120, &wg) // 调用方法,把计数值加 1,并启动任务 goroutine
dosomething(130, &wg) // 调用方法,把计数值加 1,并启动任务 goroutine
wg.Wait() // 主 goroutine 等待,代码逻辑保证了四次 Add(1) 都已经执行完了
fmt.Println("Done")
}
func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
wg.Add(1) // 计数值加 1,再启动 goroutine
go func() {
duration := millisecs * time.Millisecond
time.Sleep(duration)
fmt.Println("后台执行,duration:", duration)
wg.Done()
}()
}
可见,无论是怎么修复,都要保证所有的 Add 方法是在 Wait 方法之前被调用的。
常见问题三:前一个 Wait 还没结束就重用 WaitGroup
“前一个 Wait 还没结束就重用 WaitGroup”这一点似乎不太好理解,我借用田径比赛的例子和你解释下吧。在田径比赛的百米小组赛中,需要把选手分成几组,一组选手比赛完之后,就可以进行下一组了。为了确保两组比赛时间上没有冲突,我们在模型化这个场景的时候,可以使用 WaitGroup。
WaitGroup 等一组比赛的所有选手都跑完后 5 分钟,才开始下一组比赛。下一组比赛还可以使用这个 WaitGroup 来控制,因为 WaitGroup 是可以重用的。只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用。
但是,如果我们在 WaitGroup 的计数值还没有恢复到零值的时候就重用,就会导致程序 panic。我们看一个例子,初始设置 WaitGroup 的计数值为 1,启动一个 goroutine 先调用 Done 方法,接着就调用 Add 方法,Add 方法有可能和主 goroutine 并发执行。
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done() // 计数器减 1
wg.Add(1) // 计数值加 1
}()
wg.Wait() // 主 goroutine 等待,有可能和第 7 行并发执行
}
在这个例子中,第 6 行虽然让 WaitGroup 的计数恢复到 0,但是因为第 9 行有个 waiter 在等待,如果等待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的冲突,所以就会出现 panic。
总结一下:WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。
noCopy:辅助 vet 检查
我们刚刚在学习 WaitGroup 的数据结构时,提到了里面有一个 noCopy 字段。你还记得它的作用吗?其实,它就是指示 vet 工具在做检查的时候,这个数据结构不能做值复制使用。更严谨地说,是不能在第一次使用之后复制使用 ( must not be copied after first use)。
你可能会说了,为什么要把 noCopy 字段单独拿出来讲呢?一方面,把 noCopy 字段穿插到 waitgroup 代码中讲解,容易干扰我们对 WaitGroup 整体的理解。另一方面,也是非常重要的原因,noCopy 是一个通用的计数技术,其他并发原语中也会用到,所以单独介绍有助于你以后在实践中使用这个技术。
我们在第 3 讲学习 Mutex 的时候用到了 vet 工具。vet 会对实现 Locker 接口的数据类型做静态检查,一旦代码中有复制使用这种数据类型的情况,就会发出警告。但是,WaitGroup 同步原语不就是 Add、Done 和 Wait 方法吗?vet 能检查出来吗?
其实是可以的。通过给 WaitGroup 添加一个 noCopy 字段,我们就可以为 WaitGroup 实现 Locker 接口,这样 vet 工具就可以做复制检查了。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。
noCopy 字段的类型是 noCopy,它只是一个辅助的、用来帮助 vet 检查用的类型:
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from go vet
.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。
流行的 Go 开发项目中的坑
接下来又到了喝枸杞红枣茶的时间了。你可以稍微休息一下,心态放轻松地跟我一起围观下知名项目犯过的错,比如 copy Waitgroup、Add/Wait 并发执行问题、遗漏 Add 等 Bug。
有网友在 Go 的issue 28123中提了以下的例子,你能发现这段代码有什么问题吗?
type TestStruct struct {
Wait sync.WaitGroup
}
func main() {
w := sync.WaitGroup{}
w.Add(1)
t := &TestStruct{
Wait: w,
}
t.Wait.Done()
fmt.Println(“Finished”)
}
这段代码最大的一个问题,就是第 9 行 copy 了 WaitGroup 的实例 w。虽然这段代码能执行成功,但确实是违反了 WaitGroup 使用之后不要复制的规则。在项目中,我们可以通过 vet 工具检查出这样的错误。
Docker issue 28161 和 issue 27011 ,都是因为在重用 WaitGroup 的时候,没等前一次的 Wait 结束就 Add 导致的错误。Etcd issue 6534 也是重用 WaitGroup 的 Bug,没有等前一个 Wait 结束就 Add。
Kubernetes issue 59574 的 Bug 是忘记 Wait 之前增加计数了,这就属于我们通常认为几乎不可能出现的 Bug。
即使是开发 Go 语言的开发者自己,在使用 WaitGroup 的时候,也可能会犯错。比如 issue 12813,因为 defer 的使用,Add 方法可能在 Done 之后才执行,导致计数负值的 panic。
总结
学完这一讲,我们知道了使用 WaitGroup 容易犯的错,是不是有些手脚被束缚的感觉呢?其实大可不必,只要我们不是特别复杂地使用 WaitGroup,就不用有啥心理负担。
而关于如何避免错误使用 WaitGroup 的情况,我们只需要尽量保证下面 5 点就可以了:
- 不重用 WaitGroup。新建一个 WaitGroup 不会带来多大的资源开销,重用反而更容易出错。
- 保证所有的 Add 方法调用都在 Wait 之前。
- 不传递负数给 Add 方法,只通过 Done 来给计数值减 1。
- 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是一样的。
- 不遗漏 Done 方法的调用,否则会导致 Wait hang 住无法返回。
这一讲我们详细学习了 WaitGroup 的相关知识,这里我整理了一份关于 WaitGroup 的知识地图,方便你复习。
思考题
通常我们可以把 WaitGroup 的计数值,理解为等待要完成的 waiter 的数量。你可以试着扩展下 WaitGroup,来查询 WaitGroup 的当前的计数值吗?
欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。
文章作者 anonymous
上次更新 2024-01-16