你好,我是鸟窝。

前面三讲,我们学习了互斥锁 Mutex 的基本用法、实现原理以及易错场景,可以说是涵盖了互斥锁的方方面面。如果你能熟练掌握这些内容,那么,在大多数的开发场景中,你都可以得心应手。

但是,在一些特定的场景中,这些基础功能是不足以应对的。这个时候,我们就需要开发一些扩展功能了。我来举几个例子。

比如说,我们知道,如果互斥锁被某个 goroutine 获取了,而且还没有释放,那么,其他请求这把锁的 goroutine,就会阻塞等待,直到有机会获得这把锁。有时候阻塞并不是一个很好的主意,比如你请求锁更新一个计数器,如果获取不到锁的话没必要等待,大不了这次不更新,我下次更新就好了,如果阻塞的话会导致业务处理能力的下降。

再比如,如果我们要监控锁的竞争情况,一个监控指标就是,等待这把锁的 goroutine 数量。我们可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如 Grafana)展示出来。要知道,锁是性能下降的“罪魁祸首”之一,所以,有效地降低锁的竞争,就能够很好地提高性能。因此,监控关键互斥锁上等待的 goroutine 的数量,是我们分析锁竞争的激烈程度的一个重要指标

实际上,不论是不希望锁的 goroutine 继续等待,还是想监控锁,我们都可以基于标准库中 Mutex 的实现,通过 Hacker 的方式,为 Mutex 增加一些额外的功能。这节课,我就来教你实现几个扩展功能,包括实现 TryLock,获取等待者的数量等指标,以及实现一个线程安全的队列。

TryLock

我们可以为 Mutex 添加一个 TryLock 的方法,也就是尝试获取排外锁。

这个方法具体是什么意思呢?我来解释一下这里的逻辑。当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回 false,不会阻塞在方法调用上。

如下图所示,如果 Mutex 已经被一个 goroutine 持有,调用 Lock 的 goroutine 阻塞排队等待,调用 TryLock 的 goroutine 直接得到一个 false 返回。

在实际开发中,如果要更新配置数据,我们通常需要加锁,这样可以避免同时有多个 goroutine 并发修改数据。有的时候,我们也会使用 TryLock。这样一来,当某个 goroutine 想要更改配置数据时,如果发现已经有 goroutine 在更改了,其他的 goroutine 调用 TryLock,返回了 false,这个 goroutine 就会放弃更改。

很多语言(比如 Java)都为锁提供了 TryLock 的方法,但是,Go 官方issue 6123有一个讨论(后来一些 issue 中也提到过),标准库的 Mutex 不会添加 TryLock 方法。虽然通过 Go 的 Channel 我们也可以实现 TryLock 的功能,但是基于 Channel 的实现我们会放在 Channel 那一讲中去介绍,这一次我们还是基于 Mutex 去实现,毕竟大部分的程序员还是熟悉传统的同步原语,而且传统的同步原语也不容易出错。所以这节课,还是希望带你掌握基于 Mutex 实现的方法。

那怎么实现一个扩展 TryLock 方法的 Mutex 呢?我们直接来看代码。

// 复制 Mutex 定义的常量
const (
mutexLocked = 1 « iota // 加锁标识位置
mutexWoken // 唤醒标识位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识 waiter 的起始 bit 位置
)

// 扩展一个 Mutex 结构
type Mutex struct {
sync.Mutex
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
// 如果能成功抢到锁
if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
return true
}

// 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回 false  
old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))  
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {  
    return false  
}  

// 尝试在竞争的状态下请求锁  
new := old | mutexLocked  
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)  

}

第 17 行是一个 fast path,如果幸运,没有其他 goroutine 争这把锁,那么,这把锁就会被这个请求的 goroutine 获取,直接返回。

如果锁已经被其他 goroutine 所持有,或者被其他唤醒的 goroutine 准备持有,那么,就直接返回 false,不再请求,代码逻辑在第 23 行。

如果没有被持有,也没有其它唤醒的 goroutine 来竞争锁,锁也不处于饥饿状态,就尝试获取这把锁(第 29 行),不论是否成功都将结果返回。因为,这个时候,可能还有其他的 goroutine 也在竞争这把锁,所以,不能保证成功获取这把锁。

我们可以写一个简单的测试程序,来测试我们的 TryLock 的机制是否工作。

这个测试程序的工作机制是这样子的:程序运行时会启动一个 goroutine 持有这把我们自己实现的锁,经过随机的时间才释放。主 goroutine 会尝试获取这把锁。如果前一个 goroutine 一秒内释放了这把锁,那么,主 goroutine 就有可能获取到这把锁了,输出“got the lock”,否则没有获取到也不会被阻塞,会直接输出“can't get the lock”。

func try() {
var mu Mutex
go func() { // 启动一个 goroutine 持有一段时间的锁
mu.Lock()
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
mu.Unlock()
}()

time.Sleep(time.Second)  

ok := mu.TryLock() // 尝试获取到锁  
if ok { // 获取成功  
    fmt.Println("got the lock")  
    // do something  
    mu.Unlock()  
    return  
}  

// 没有获取到  
fmt.Println("can't get the lock")  

}

获取等待者的数量等指标

接下来,我想和你聊聊怎么获取等待者数量等指标。

第二讲中,我们已经学习了 Mutex 的结构。先来回顾一下 Mutex 的数据结构,如下面的代码所示。它包含两个字段,state 和 sema。前四个字节(int32)就是 state 字段。

type Mutex struct {
state int32
sema uint32
}

Mutex 结构中的 state 字段有很多个含义,通过 state 字段,你可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息。但是,state 这个字段并没有暴露出来,所以,我们需要想办法获取到这个字段,并进行解析。

怎么获取未暴露的字段呢?很简单,我们可以通过 unsafe 的方式实现。我来举一个例子,你一看就明白了。

const (
mutexLocked = 1 « iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
)

type Mutex struct {
sync.Mutex
}

func (m *Mutex) Count() int {
// 获取 state 字段的值
v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
v = v » mutexWaiterShift //得到等待者的数值
v = v + (v & mutexLocked) //再加上锁持有者的数量,0 或者 1
return int(v)
}

这个例子的第 14 行通过 unsafe 操作,我们可以得到 state 字段的值。第 15 行我们右移三位(这里的常量 mutexWaiterShift 的值为 3),就得到了当前等待者的数量。如果当前的锁已经被其他 goroutine 持有,那么,我们就稍微调整一下这个值,加上一个 1(第 16 行),你基本上可以把它看作是当前持有和等待这把锁的 goroutine 的总数。

state 这个字段的第一位是用来标记锁是否被持有,第二位用来标记是否已经唤醒了一个等待者,第三位标记锁是否处于饥饿状态,通过分析这个 state 字段我们就可以得到这些状态信息。我们可以为这些状态提供查询的方法,这样就可以实时地知道锁的状态了。

// 锁是否被持有
func (m *Mutex) IsLocked() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexLocked == mutexLocked
}

// 是否有等待者被唤醒
func (m *Mutex) IsWoken() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexWoken == mutexWoken
}

// 锁是否处于饥饿状态
func (m *Mutex) IsStarving() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexStarving == mutexStarving
}

我们可以写一个程序测试一下,比如,在 1000 个 goroutine 并发访问的情况下,我们可以把锁的状态信息输出出来:

func count() {
var mu Mutex
for i := 0; i < 1000; i++ { // 启动 1000 个 goroutine
go func() {
mu.Lock()
time.Sleep(time.Second)
mu.Unlock()
}()
}

time.Sleep(time.Second)  
// 输出锁的信息  
fmt.Printf("waitings: %d, isLocked: %t, woken: %t,  starving: %t\n", mu.Count(), mu.IsLocked(), mu.IsWoken(), mu.IsStarving())  

}

有一点你需要注意一下,在获取 state 字段的时候,并没有通过 Lock 获取这把锁,所以获取的这个 state 的值是一个瞬态的值,可能在你解析出这个字段之后,锁的状态已经发生了变化。不过没关系,因为你查看的就是调用的那一时刻的锁的状态。

使用 Mutex 实现一个线程安全的队列

最后,我们来讨论一下,如何使用 Mutex 实现一个线程安全的队列。

为什么要讨论这个话题呢?因为 Mutex 经常会和其他非线程安全(对于 Go 来说,我们其实指的是 goroutine 安全)的数据结构一起,组合成一个线程安全的数据结构。新数据结构的业务逻辑由原来的数据结构提供,而 Mutex 提供了锁的机制, 保证线程安全

比如队列,我们可以通过 Slice 来实现,但是通过 Slice 实现的队列不是线程安全的,出队(Dequeue)和入队(Enqueue)会有 data race 的问题。这个时候,Mutex 就要隆重出场了,通过它,我们可以在出队和入队的时候加上锁的保护。

type SliceQueue struct {
data []interface{}
mu sync.Mutex
}

func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0, n)}
}

// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}

// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}

因为标准库中没有线程安全的队列数据结构的实现,所以,你可以通过 Mutex 实现一个简单的队列。通过 Mutex 我们就可以为一个非线程安全的 data interface{}实现线程安全的访问。

总结

好了,我们来做个总结。

Mutex 是 package sync 的基石,其他的一些同步原语也是基于它实现的,所以,我们“隆重”地用了四讲来深度学习它。学到后面,你一定能感受到,多花些时间来完全掌握 Mutex 是值得的。

今天这一讲我和你分享了几个 Mutex 的拓展功能,这些方法是不是给你带来了一种“骇客”的编程体验呢,通过 Hacker 的方式,我们真的可以让 Mutex 变得更强大。

我们学习了基于 Mutex 实现 TryLock,通过 unsafe 的方式读取到 Mutex 内部的 state 字段,这样,我们就解决了开篇列举的问题,一是不希望锁的 goroutine 继续等待,一是想监控锁。

另外,使用 Mutex 组合成更丰富的数据结构是我们常见的场景,今天我们就实现了一个线程安全的队列,未来我们还会讲到实现线程安全的 map 对象。

到这里,Mutex 我们就系统学习完了,最后给你总结了一张 Mutex 知识地图,帮你复习一下。

思考题

你可以为 Mutex 获取锁时加上 Timeout 机制吗?会有什么问题吗?

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。