Go并发

互斥锁Mutex

说起并发访问问题,真是太常见了,比如多个 goroutine 并发更新同一个资源,像计数器;同时更新用户的账户信息;秒杀系统;往同一个 buffer 中并发写入数据等等。如果没有互斥控制,就会出现一些异常情况,比如计数器的计数不准确、用户的账户可能出现透支、秒杀系统出现超卖、buffer 中的数据混乱,等等,后果都很严重。

这些问题怎么解决呢?对,用互斥锁,那在 Go 语言里,就是 Mutex

临界区:在并发编程中,如果程序中的一部分会被并发访问或修改,那么,为了避免并发访问导致的意想不到的结果,这部分程序需要被保护起来,这部分被保护起来的程序,就叫做临界区。

如果很多线程同步访问临界区,就会造成访问或操作错误,这当然不是我们希望看到的结果。所以,我们可以使用互斥锁,限定临界区只能同时由一个线程持有

Mutex 的架构演进分成了四个阶段:

互斥锁1.0

通过一个 flag 变量,标记当前的锁是否被某个 goroutine 持有。如果 这个 flag 的值是 1,就代表锁已经被持有,那么,其它竞争的 goroutine 只能等待;如果 这个 flag 的值是 0,就可以通过 CAS(compare-and-swap,或者 compare-and-set) 将这个 flag 设置为 1,标识锁被当前的这个 goroutine 持有了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// CAS操作,当时还没有抽象出atomic包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// 互斥锁的结构,包含两个字段
type Mutex struct {
key int32 // 锁是否被持有的标识
sema int32 // 信号量专用,用以阻塞/唤醒goroutine
}

// 保证成功在val上增加delta的值
func xadd(val *int32, delta int32) (new int32) {
for {
v := *val
if cas(val, v, v + delta) {
return v + delta
}
}
panic("unreached")
}

// 请求锁
func (m *Mutex) Lock(){
if xadd(&m.key, 1) == 1 {//标识加1,如果等于1,成功获取到锁
return
}
semacquire(&m.sema) // 否则阻塞等待
}


func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
return
}
semrelease(&m.sema) // 唤醒其它阻塞的goroutine
}

Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至今。

所以,我们在使用 Mutex 的时候,必须要保证 goroutine 尽可能不去释放自己未持有的 锁,一定要遵循谁申请,谁释放的原则。

但是,初版的 Mutex 实现有一个问题:
请求锁的 goroutine 会排队等待获取互斥锁。虽然这貌似很公平,但是从性能上来看,却不是最优的。因为如果我们能够把锁交给正在占 用 CPU 时间片的 goroutine 的话,那就不需要做上下文的切换,在高并发的情况下,可能会有更好的性能。

互斥锁2.0:给新人机会

1
2
3
4
5
6
7
8
9
10
11
type Mutex struct {
state int32 //互斥锁上锁状态枚举值如下所示
sema uint32 //信号量,向处于Gwaitting的G发送信号
}

const (
mutexLocked = 1 << iota // 1 互斥锁是锁定的
mutexWoken // 2 唤醒锁
mutexWaiterShift = iota // 2 统计阻塞在这个互斥锁上的goroutine数目需要移位的数值
)

虽然 Mutex 结构体还是包含两个字段,但是第一个字段已经改成了 state,它的含义也不 一样了。

state 是一个复合型的字段,一个字段包含多个意义,这样可以通过尽可能少的内存来实现 互斥锁。

这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有 唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
for {
old := m.state
new := old | mutexLocked // 新状态加锁
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift //等待者数量加一
}
if awoke {
// goroutine是被唤醒的, 新状态清除唤醒标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
if old&mutexLocked == 0 { // 锁原状态未加锁
break
}
runtime.Semacquire(&m.sema) // 请求信号量
awoke = true
}
}
}

如果想要获取锁的 goroutine 没有机会获取到锁,就会进行休眠,但是在 锁释放唤醒之后,它并不能像先前一样直接获取到锁,还是要和正在请求锁的 goroutine 进行争。这会给后来请求锁的 goroutine 一个机会,也让 CPU 中正在执行的 goroutine 有更多的机会获取到锁,在一定程度上提高了程序的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
panic("sync: unlock of unlocked mutex")
}

old := new
for {
// 如果没有其它的 waiter,说明对这个锁的竞争的 goroutine 只有一个,那就 可以直接返回了
// 如果这个时候有唤醒的 goroutine,或者是又被别人加了锁,那么其它 goroutine 自己干得都很好,当前的这个 goroutine 就可以放心返回 了。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// 有等待者,并且没有唤醒的 waiter,那就需要唤醒一个等待的 waiter。
// 在唤醒之前,需要将 waiter 数量减 1,并且将 mutexWoken 标志设置上
new = (old - 1<<mutexWaiterShift) | mutexWoken //新状态,准备唤醒 goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semrelease(&m.sema)
return
}
old = m.state
}
}

相对于初版的设计,这次的改动主要就是,新来的 goroutine 也有机会先获取到锁,甚至 一个 goroutine 可能连续获取到锁,打破了先来先得的逻辑。但是,代码复杂度也显而易见。

互斥锁3.0:多给些机会

在 2015 年 2 月的改动中,如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不 到锁,它们就会通过自旋(runtime 中实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

awoke := false
iter := 0 // 3.0新增,循环计数器
for { // 不管是新来的请求锁的 goroutine, 还是被唤醒的 goroutine,都不断尝试请求锁
old := m.state // 先保存当前锁的状态
new := old | mutexLocked // 新状态加锁
if old&mutexLocked != 0 {// 锁还没被释放
if runtime_canSpin(iter) { // 3.0新增,检查是否可以进入自旋锁
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true //awoke标记为true
}
runtime_doSpin() //进入自旋状态
iter++
continue// 自旋,再次请求锁
}
//没有获取到锁,当前G进入Gwaitting状态
new = old + 1<<mutexWaiterShift //等待者数量加一
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// goroutine是被唤醒的, 新状态清除唤醒标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
if old&mutexLocked == 0 { // 锁原状态未加锁
break
}
// 锁请求失败,进入休眠状态,等待信号唤醒后重新开始循环
runtime_Semacquire(&m.sema) // 阻塞等待
awoke = true// 被唤醒
iter = 0
}
}
}

互斥锁4.0:解决饥饿

经过几次优化,Mutex 的代码越来越复杂,应对高并发争抢锁的场景也更加公平。但是你 有没有想过,因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,这就是饥饿问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving // 从state字段中分出一个饥饿标记
mutexWaiterShift = iota

starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待时间
starving := false // 饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态

for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
iter++// 自旋次数加1
old = m.state// 再次获取锁的状态,之后会检查是否锁被释放了
continue
}

new := old
if old&mutexStarving == 0 {
new |= mutexLocked //非饥饿状态,加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果当前被锁定或者处于饥饿模式,则waiter加一,表示等待一个等待计数
new += 1 << mutexWaiterShift
}
// 如果是饥饿状态,并且已经上锁了,那么mutexStarving状态位设置为1,设置为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke为true则表明当前线程在上面自旋的时候,修改mutexWoken状态成功
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标志位
new &^= mutexWoken
}
// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 处理饥饿状态

// 2.如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 { // 3.如果等待时间为0,那么初始化等待时间
waitStartTime = runtime_nanotime()
}

runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 4.阻塞等待
// 5.唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回
if old&mutexStarving != 0 { // 6.判断是否已经处于饥饿状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}

delta := int32(mutexLocked - 1<<mutexWaiterShift) // 7.加锁并且将waiter数减1
if !starving || old>>mutexWaiterShift == 1 {
// 8.如果当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式
delta -= mutexStarving
}

atomic.AddInt32(&m.state, delta) // 9.设置状态
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

}


func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
//返回一个state被减后的值
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
//如果返回的state值不为0,那么进入到unlockSlow中
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果没有 waiter,或者已经有在处理的情况,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// waiter 数减 1,mutexWoken 标志设置上,通过 CAS 更新 state 的值
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 直接唤醒等待队列中的 waiter
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饥饿模式
// 直接唤醒等待队列中的 waiter
runtime_Semrelease(&m.sema, true, 1)
}
}

跟之前的实现相比,当前的 Mutex 最重要的变化,就是增加饥饿模式。将饥饿模 式的最大等待时间阈值设置成了 1 毫秒,这就意味着,一旦等待者等待的时间超过了这个 阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,新来的同学主动谦让一下,给老同志一些机会。

这部分的图解是休眠前的操作,休眠前会根据old的状态来判断能不能直接获取到锁,如果old状态没有上锁,也没有饥饿,那么直接break返回,因为这种情况会在CAS中设置加锁;

接着往下判断,waitStartTime是否等于0,如果不等于,说明不是第一次来了,而是被唤醒后来到这里,那么就不能直接放到队尾再休眠了,而是要放到队首,防止长时间抢不到锁.

正常模式下,waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要 和新来的 goroutine 进行竞争。
在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,也不会 自旋,直接加入到等待队列的尾部。

扩展

TryLock

当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或 者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回 false,不会阻塞在方法调用上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 复制Mutex定义的常量
const (
mutexLocked = 1 << iota // 加锁标识位置
mutexWoken // 唤醒标识位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识waiter的起始bit位置
)

// 扩展一个Mutex结构
type Mutex struct {
sync.Mutex
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
// 如果能成功抢到锁
if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked)
return true
}
// 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
return false
}
// 尝试在竞争的状态下请求锁
new := old | mutexLocked
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}

使用 Mutex 实现一个线程安全的队列

Mutex 经常会和其他非线程安全(对于 Go 来说,我们其实指的是 goroutine 安全)的数据结构一起,组合成一个线程安全的数据结构。新数据结构的业务逻辑由原来的数据结构提供,而 Mutex 提供了锁的机制,来保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type SliceQueue struct {
data []interface{}
mu sync.Mutex
}

func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0, n)}
}

// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}

// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}

读写锁RWMutex

标准库中的 RWMutex 是一个 reader/writer 互斥锁。RWMutex 在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。

实现原理

readers-writers 问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类:

  • Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下 可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放 了锁之后,写才可能获取到锁。
  • Write-preferring:写优先的设计意味着,如果已经有一个 writer 在等待请求锁的 话,它会阻止新来的请求锁的 reader 获取到锁,所以优先保障 writer。
  • 不指定优先级:不区分 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更有效,因为第一类优先级会导致写饥饿,第二类优先级可能会导致读饥饿,这种不指定优先级的访问不再区分读写,大家都是同一个优先级,解决了饥饿的问题。

Go 标准库中的 RWMutex 是基于 Mutex 实现的,设计是 Write-preferring 方案。一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁。

RLock/RUnlock 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type RWMutex struct {
w Mutex // 互斥锁解决多个writer的竞争
writerSem uint32 // writer信号量
readerSem uint32 // reader信号量
readerCount int32 // reader的数量(以及是否有 writer 竞争锁)
readerWait int32 // writer等待完成的reader的数量
}

const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) RLock() {

// 读锁个数 + 1, 在什么情况下会<0呢?在有写锁被获取的时候
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// sleep 等待写锁释放时唤醒
runtime_SemacquireMutex(&rw.readerSem, false)
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r) // 有等待的writer
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false) // 最后一个reader了,writer终于有机会获得锁了
}
}


Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14

func (rw *RWMutex) Lock() {
// 首先解决其他writer竞争问题
rw.w.Lock()
// 反转readerCount,告诉reader有writer竞争锁
// 读锁个数(readerCount)瞬间减去一个非常大的数(const rwmutexMaxReaders = 1 << 30),为什么需要这样做呢?
// 其实在上面RLock()中是通过判断readerCount来决定是否可以获取读锁的,当readerCount变成一个非常小的负数时会导致无法获取到读锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 这里的r其实就是原来的readerCount。r!=0 说明还有读锁未释放
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// sleep 等待读锁释放唤醒
runtime_SemacquireMutex(&rw.writerSem, false)
}
}

Unlock

1
2
3
4
5
6
7
8
9
10
11
12
func (rw *RWMutex) Unlock() {
// lock()的时候减了一个非常大的数,现在加回去,告诉reader没有活跃的writer了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

// 唤醒阻塞的reader们
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 释放内部的互斥锁
rw.w.Unlock()
}

WaitGroup

WaitGroup 很简单,就是 package sync 用来做任务编排的一个并发原语。它要解决的就是并发 - 等待的问题: 现在有一个 goroutine A 在检查点(checkpoint)等待一组 goroutine 全部完成,如果 在执行任务的这些 goroutine 还没全部完成,那么 goroutine A 就会阻塞在检查点,直到 所有 goroutine 都完成后才能继续执行。

实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type WaitGroup struct {
noCopy noCopy // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
state1 [3]uint32
// 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
// 另外32bit是用作信号量的
// 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样
// 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
}

// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}

}


Add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 获取 counter, 当前计数值
w := uint32(state) // 获取 waiter

if v > 0 || w == 0 { // counter > 0:还有任务在执行;waiter == 0 表示没有在阻塞等待的 goroutine
return
}

// 执行到此处相当于 countr = 0,即所有的任务都已执行完,需要唤起等待的 goroutine了
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}

// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

func (wg *WaitGroup) Wait() {
statep, semap := wg.state()

for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter的数量
if v == 0 {
// 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
return
}
// 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}

-------- 本文结束 感谢阅读 --------

本文标题:Go并发

文章作者:Guyuqing

发布时间:2021年09月21日 - 14:25

最后更新:2021年09月21日 - 23:16

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

坚持技术分享,您的支持将鼓励我继续创作!
0%