Lock

开端

今天学习下go里面的sync.mutex的实现以及相关扩展知识。

锁的介绍

首先,计算机中的锁是为了控制并发情况下,对同一资源的并发访问。锁呢,有利有弊。好的点在于,我们可以控制并发访问的顺序逻辑。避免程序因为资源竞争,而出现一些预期外的情况。 不好的点在于,加锁意味着并发度的下降,效率的下降。所以我们在使用锁来完成业务需求的时候,也要考虑锁竞争对业务带来的影响。根据业务情况确定是否使用及使用的方式。

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// A Mutex is a mutual exclusion lock.
// The zero Val for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota

	// Mutex fairness.
	//
	// Mutex can be in 2 modes of operations: normal and starvation.
	// In normal mode waiters are queued in FIFO order, but a woken up waiter
	// does not own the mutex and competes with new arriving goroutines over
	// the ownership. New arriving goroutines have an advantage -- they are
	// already running on CPU and there can be lots of them, so a woken up
	// waiter has good chances of losing. In such case it is queued at front
	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
	// it switches mutex to the starvation mode.
	//
	// In starvation mode ownership of the mutex is directly handed off from
	// the unlocking goroutine to the waiter at the front of the queue.
	// New arriving goroutines don't try to acquire the mutex even if it appears
	// to be unlocked, and don't try to spin. Instead they queue themselves at
	// the tail of the wait queue.
	//
	// If a waiter receives ownership of the mutex and sees that either
	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
	// it switches mutex back to normal operation mode.
	//
	// Normal mode has considerably better performance as a goroutine can acquire
	// a mutex several times in a row even if there are blocked waiters.
	// Starvation mode is important to prevent pathological cases of tail latency.
	starvationThresholdNs = 1e6
)

上面代码就是go1.18中sync.mutex的定义。可以看到Mutex结构体中有state和sema两个字段,

  • state int32类型,代表的是锁的状态.
  • sema uint32类型,代表信号量。他主要用于唤醒阻塞在互斥锁上的其他协程。

锁的状态

m.state

图片来自Draveness大神,state有以下几种状态:

  • mutexLocked。锁状态,占1bit,0-可以获取锁,1-锁定状态,阻塞等待
  • mutexWoken。唤醒状态。占1bit,代表一个过程阶段,0-没有协程唤醒 1-有协程被唤醒,申请锁定过程中。
  • mutexStarving。饥饿状态。占1bit,当协程超过1ms还没有获取锁时,锁就会处于饥饿状态。
  • mutexWaiterShift/waitersCount。等待信号量状态。占29bit,当有协程释放锁时,需要根据此状态,决定是否释放信号量。用于通知其他协程获取此锁。

实现原理

sync.Mutex的实现代码只有200多行,但是里面锁的切换控制还是比较复杂,下面我们逐步来分析

加锁过程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

Lock函数使用CompareAndSwapInt32判断m.state是不是0,如果是的话state设为lock状态,成功获取到锁。 失败则调用lockSlow()函数,这个函数是实现状态控制的主要逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

slowLock函数依靠CAS+信号量+自旋来实现。下面我们对照代码,逐行分析下逻辑:

  • 1.在调用slowLock之前,已经判断过state是否可以获取锁。进入slowLock后会阻塞当前G,尝试获取锁。 首先判断是否满足条件: 【锁处于非饥饿状态, locked状态,并且可以自旋】,满足即开始自旋,在自旋的过程中尝试将锁的状态设置为唤醒,尽量让当前G获取的锁。

  • 2.当在自旋的过程中发现可以获取锁时,进入下面逻辑。用old初始化一个临时的new state。判断锁如果不处于饥饿状态,new state加上locked状态。 (饥饿状态下的锁,G是需要排队才可以获取的,源码注释也有)。接着,判断old状态是locked或者饥饿时,将waiterShift加8,代表等待的G加1。 接着判断如果此G已经处于饥饿状态,并且old已经处于locked状态。new state就增加饥饿状态。(文中的翻译说明了原因,当前的goroutine将mutex设为饥饿状态, 但是如果mutex已经解锁的话,就不要进行设定了。因为Unlock需要处于饥饿状态的mutex有等待者)。 判断当前G是否是被唤醒的,是的话将new state设置成非唤醒。

  • 3.接下来要注意,这里再次将m.state和old进行了比较。主要是判断有其他的goroutine改变mutex的状态。如果有的话new state就作废,完成本次逻辑,继续循环自旋尝试获取锁。 如果没有的话,当前无锁+不饥饿就可以获取锁成功break。否则加锁失败,G需要进入队列,如果G第一次等待就放在队尾,否则就是被唤醒再次获取锁失败。就会被放在队首。 判断当前G是否该进入饥饿状态的标准是G等待mutex的时间是否大于1ms。重新获取mutex的状态,如果处于饥饿状态获取锁成功。此时需要考虑解除锁的饥饿状态。 满足1.当前G等待时间小于1ms 2.等待队列为空 两个条件其一即可退出饥饿模式。

上面是对照代码的理解过程,下面我们总结下: 正常情况下, 当一个Goroutine获取到锁后, 其他的G获取锁时会自旋或者进入睡眠状态,等待信号量唤醒,自旋是希望当前的G能获取到锁,因为它持有CPU, 可以避免资源切换。但是又不能一直自旋, 所以mutex就会有饥饿模式,如果一个G被唤醒过后, 仍然没有拿到锁, 那么该G会放在等待队列的最前面。 并且那些等待超过1ms的G还没有获取到锁,该G就会进入饥饿状态。该G是饥饿状态并且Mutex是Locked状态时,才有可能给Mutex设置成饥饿状态。

获取到锁的G解锁时,将mutex的状态设为unlock,然后发出信号,等待的G开始抢夺锁的。但是如果mutex处于饥饿状态,就会将信号发给第一个G,唤醒它。这就是G排队。

解锁过程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

解除mutex的lock状态,获取到最新的状态new。如果new是饥饿状态,唤醒第一个G,获取锁。解锁完成。 如果不是饥饿状态。当前没有G等待,或者有G已经被唤醒去加锁了。就不需要做唤醒的动作。退出即可。 否则将等待的G-1,并一直尝试将G设置为唤醒状态,释放信号量,通知所有的G都可以去抢夺锁。设置成功解锁完成,否则继续执行。

引申问题

源码阅读注意的点

在理解sync.mutex的时候,一定要注意的点是,m的state在代码执行过程中,很可能会有其他的G改变其状态。所以查看代码逻辑的时候, 一定要时刻记得这个点

woken状态的意义

Woken状态用于加锁和解锁过程的通信,举个例子,同一时刻,两个协程一个在加锁,一个在解锁,在加锁的协程可能在自旋过程中, 此时把Woken标记为1,用于通知解锁协程不必释放信号量了,好比在说:你只管解锁好了,不必释放信号量,我马上就拿到锁了。

为什么重复解锁会panic

Unlock的逻辑是,解除mutex的lock状态,然后检查是否有协程等待,有的话释放信号量,唤醒协程。如果多次unlock的话,就会 发送多个信号量,唤醒多个G去抢夺锁。会造成不必要的竞争,也会造成协程切换,浪费资源,实现复杂度也会增加。

G的饥饿和mutex饥饿的关系

只有G处于饥饿状态的时候,才会将mutex设为饥饿状态。当mutex处于饥饿状态时,才可能会让饥饿的G获取到锁。需要注意的是,设mutex 为饥饿状态的G不一定会获取到锁,有可能会被其他G截胡。

G可以成功获取锁的情况

  • 第一次加锁的时候m.state=0,一定是可以获取锁。没有其他的G获取锁,没有改变其状态。
  • 当前的mutex不是饥饿状态,也不是lock状态,尝试CAS加锁的时候,如果没有其他G改变m状态,可以成功。
  • 某个G被唤醒后,重新获取mutex,此时mutex处于饥饿状态,没有其他的G来抢夺,因为这个时候只唤醒了饥饿的G,G也可以成功。

参考资料