3 minutes
Golang Condition
源代码参考: sync/cond.go 数据结构:
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
重点关注下 notifyList 的实现.
notifyList 实现原理
数据结构
需要注意的是, 这里的notifyList最终的实现其实是 sema.go 中 notifyList.
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32
// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
通过注释, 我们能够发现, notifyList 有一个ticket概念, 本质上就是一个 自增id, waiter表示下一个要分配的ticket, notify 则指向下一个需要被通知的waiter, 需要注意的是, notify之前的ticket都是已经被通知过的, 所以, 可以通过比较大小避免无效的通知. 甚至, 可以比较wait和notify的大小, 判断是否需要notify(相等的情况下, 表示自从上次notify之后, 没有waiter). head/tail则是 sudog(就是等待的goroutine的表示, 通过双链实现), 那么, 等待/通知的本质上就是挂起/唤醒goroutine.
通过下面的流程看下 notifyList 的实现方式.
- notifyListAdd: 将等待着分配一个wait id.
- notifyListWait: 获取sudog, 进行实例化, 添加到 notifyList上面. 挂起当前goroutine. (如果开启了pprof, 会进行block time的采样. releatime-to, 唤醒的时候会设置releaseTime为cputicks())
- notifyListNotifyAll: 修改notify, 循环双链通知等待的goroutine.
- notifyListNotifyOne: 双重检查, 寻找对应的goroutine进行唤醒(注意goroutine的挂起和分配wait顺序不是一致的, goroutine的挂起并不是按照wait排序的, 所以找到wait对应的goroutine需要一次遍历)
notifyList使用
在cond中, 常用的函数的实现如下:
- Wait: 调用 runtime_notifyListAdd + runtime_notifyListWait, 分配wait ticket 并挂起当前goroutine, 实现中细化锁的粒度. 在被唤醒后, 这里会归还 sudog.
- Signal: 调用 runtime_notifyListNotifyOne 唤醒下一个goroutine
- Broadcast: 调用runtime_notifyListNotifyAll, 唤醒所有等待的goroutine
有意思的地方
- acquireSudog避免循环调用的方式很有意思. 通过获取m实现. 细节需要在深入
- acquireSudog: 从队列的sudoCache中获取一个sudog, 相当于一个 分配行为, acquireSudog在当前q没有sudog的时候, 会去central cache(sched对象的cache)拿一个. 如果还是没有, 就直接分配一个新的. 在归还的时候, 也就是releaseSudog, 会判断是否满了, 满的情况下会讲一半sudog迁移到 central上 (sched对象).
- sudog 专门为 sema做了适配, 添加了 ticket 字段, 用来 notifyListNotifyOne 判断当前goroutine是否是需要唤醒的那个.
内部mutex
在实现condition语义的时候, 在wait方法的调用开始的地方, 会先尝试拿到锁. 这里的锁是内部的mutex. sema.go中定义了mutex. 数据结构如下:
// Mutual exclusion locks. In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
type mutex struct {
// Futex-based impl treats it as uint32 key,
// while sema-based impl as M* waitm.
// Used to be a union, but unions break precise GC.
key uintptr
}
通过注释可以知道, 在未冲突的情况下, 只是用户级别的指令: 自旋. 在冲突的情况下, 会在内核中睡眠. 执行加锁、释放的逻辑参照 lock_sema.go, mutex使用key有下面几个状态:
const (
locked uintptr = 1
active_spin = 4
active_spin_cnt = 30
passive_spin = 1
)
常用函数如下:
- lock: 如果cas成功, 就直接返回; 实例化系统的semaphore, 循环判断当前锁是否被释放, 释放的情况下, 尝试cas加锁; 循环的次数, 会先 采用空循环的策略(procyild), 乐观策略; 仍然失败的情况下, 会采用释放线程的cpu控制权(osyield); 还是失败, 是多个线程抢占, 尝试M排队, 使用 nextwaitm记录上一个排队的m的值, 使用key字段传递semaphore语义, 让unlock的时候通知释放key, 同时将下一个等待的m放入key信息中.
- unlock: lock状态下会通过cas将状态重置为 unlock. 非lock状态, 有其他线程排队, 通知其他m. 其中, semacreate是平台相关的, ; atomic.Casuintptr 方法是映射到汇编方法执行的.
yield
- procyield
#if defined (__i386__) || defined (__x86_64__)
#include <xmmintrin.h>
#endif
/* Spin wait. */
void
runtime_procyield (uint32 cnt)
{
volatile uint32 i;
for (i = 0; i < cnt; ++i)
{
#if defined (__i386__) || defined (__x86_64__)
_mm_pause ();
#endif
}
}
参照stackOverflow上的回答, _mm_pause 本质上就是一个 pause汇编指令. pause汇编指令参考 linux manual page 的 11.4.4.4, 按照文章的说法, 用来提升 Intel Xeon 处理器性能, 以及减少 Pentium 4 的能量损耗.
- osyield
/* Ask the OS to reschedule this thread. */
void runtime_osyield(void)
__attribute__ ((no_split_stack));
void
runtime_osyield (void)
{
sched_yield ();
}
sched_yield参照[linux manual page]](http://www.man7.org/linux/man-pages/man2/sched_yield.2.html). 按照描述, 调用sched_yield之后, 会放弃cpu, 当前线程会被移动到静态优先级的队列的末尾. 需要注意的, 如果当前线程是唯一一个在最高优先级列表的, 那么, 在调用sched_yield()之后, 仍然会运行。
sema
lock的时候使用 semasleep, unlock的时候使用 semawakeup 通知其他m. 这里使用的是操作系统的 semasleep(-1) 和 semawakeup. 这里使用 mOS.waitsema 的指针 作为 semaphore字段.
平台创建
- semaphore 这里以 solaris平台为例子. 方法如下:
func semacreate(mp *m) {
if mp.mos.waitsema != 0 {
return
}
var sem *semt
// Call libc's malloc rather than malloc. This will
// allocate space on the C heap. We can't call malloc
// here because it could cause a deadlock.
sem = (*semt)(libc_malloc(unsafe.Sizeof(*sem)))
if sem_init(sem, 0, 0) != 0 {
throw("sem_init")
}
mp.mos.waitsema = uintptr(unsafe.Pointer(sem))
}
这里,先是堆上创建创建了对象, 然后实例化semaphore对象. sem_init参照linux manual
参考:
574 Words
2019-05-15 21:58 +0800