源代码参考: sync/cond.go 数据结构:

type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

重点关注下 notifyList 的实现.

notifyList 实现原理

数据结构

需要注意的是, 这里的notifyList最终的实现其实是 sema.go 中 notifyList.

type notifyList struct {
    // wait is the ticket number of the next waiter. It is atomically
    // incremented outside the lock.
    wait uint32

    // notify is the ticket number of the next waiter to be notified. It can
    // be read outside the lock, but is only written to with lock held.
    //
    // Both wait & notify can wrap around, and such cases will be correctly
    // handled as long as their "unwrapped" difference is bounded by 2^31.
    // For this not to be the case, we'd need to have 2^31+ goroutines
    // blocked on the same condvar, which is currently not possible.
    notify uint32

    // List of parked waiters.
    lock mutex
    head *sudog
    tail *sudog
}

通过注释, 我们能够发现, notifyList 有一个ticket概念, 本质上就是一个 自增id, waiter表示下一个要分配的ticket, notify 则指向下一个需要被通知的waiter, 需要注意的是, notify之前的ticket都是已经被通知过的, 所以, 可以通过比较大小避免无效的通知. 甚至, 可以比较wait和notify的大小, 判断是否需要notify(相等的情况下, 表示自从上次notify之后, 没有waiter). head/tail则是 sudog(就是等待的goroutine的表示, 通过双链实现), 那么, 等待/通知的本质上就是挂起/唤醒goroutine.

通过下面的流程看下 notifyList 的实现方式.

  1. notifyListAdd: 将等待着分配一个wait id.
  2. notifyListWait: 获取sudog, 进行实例化, 添加到 notifyList上面. 挂起当前goroutine. (如果开启了pprof, 会进行block time的采样. releatime-to, 唤醒的时候会设置releaseTime为cputicks())
  3. notifyListNotifyAll: 修改notify, 循环双链通知等待的goroutine.
  4. notifyListNotifyOne: 双重检查, 寻找对应的goroutine进行唤醒(注意goroutine的挂起和分配wait顺序不是一致的, goroutine的挂起并不是按照wait排序的, 所以找到wait对应的goroutine需要一次遍历)

notifyList使用

在cond中, 常用的函数的实现如下:

  1. Wait: 调用 runtime_notifyListAdd + runtime_notifyListWait, 分配wait ticket 并挂起当前goroutine, 实现中细化锁的粒度. 在被唤醒后, 这里会归还 sudog.
  2. Signal: 调用 runtime_notifyListNotifyOne 唤醒下一个goroutine
  3. Broadcast: 调用runtime_notifyListNotifyAll, 唤醒所有等待的goroutine

有意思的地方

  1. acquireSudog避免循环调用的方式很有意思. 通过获取m实现. 细节需要在深入
  2. acquireSudog: 从队列的sudoCache中获取一个sudog, 相当于一个 分配行为, acquireSudog在当前q没有sudog的时候, 会去central cache(sched对象的cache)拿一个. 如果还是没有, 就直接分配一个新的. 在归还的时候, 也就是releaseSudog, 会判断是否满了, 满的情况下会讲一半sudog迁移到 central上 (sched对象).
  3. sudog 专门为 sema做了适配, 添加了 ticket 字段, 用来 notifyListNotifyOne 判断当前goroutine是否是需要唤醒的那个.

内部mutex

在实现condition语义的时候, 在wait方法的调用开始的地方, 会先尝试拿到锁. 这里的锁是内部的mutex. sema.go中定义了mutex. 数据结构如下:

// Mutual exclusion locks.  In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
type mutex struct {
    // Futex-based impl treats it as uint32 key,
    // while sema-based impl as M* waitm.
    // Used to be a union, but unions break precise GC.
    key uintptr
}

通过注释可以知道, 在未冲突的情况下, 只是用户级别的指令: 自旋. 在冲突的情况下, 会在内核中睡眠. 执行加锁、释放的逻辑参照 lock_sema.go, mutex使用key有下面几个状态:

const (
    locked uintptr = 1

    active_spin     = 4
    active_spin_cnt = 30
    passive_spin    = 1
)

常用函数如下:

  1. lock: 如果cas成功, 就直接返回; 实例化系统的semaphore, 循环判断当前锁是否被释放, 释放的情况下, 尝试cas加锁; 循环的次数, 会先 采用空循环的策略(procyild), 乐观策略; 仍然失败的情况下, 会采用释放线程的cpu控制权(osyield); 还是失败, 是多个线程抢占, 尝试M排队, 使用 nextwaitm记录上一个排队的m的值, 使用key字段传递semaphore语义, 让unlock的时候通知释放key, 同时将下一个等待的m放入key信息中.
  2. unlock: lock状态下会通过cas将状态重置为 unlock. 非lock状态, 有其他线程排队, 通知其他m. 其中, semacreate是平台相关的, ; atomic.Casuintptr 方法是映射到汇编方法执行的.

yield

  1. procyield
#if defined (__i386__) || defined (__x86_64__)
#include <xmmintrin.h>
#endif
/* Spin wait.  */

void
runtime_procyield (uint32 cnt)
{
  volatile uint32 i;

  for (i = 0; i < cnt; ++i)
    {
#if defined (__i386__) || defined (__x86_64__)
      _mm_pause ();
#endif
    }
}

参照stackOverflow上的回答, _mm_pause 本质上就是一个 pause汇编指令. pause汇编指令参考 linux manual page 的 11.4.4.4, 按照文章的说法, 用来提升 Intel Xeon 处理器性能, 以及减少 Pentium 4 的能量损耗.

  1. osyield
/* Ask the OS to reschedule this thread.  */

void runtime_osyield(void)
  __attribute__ ((no_split_stack));

void
runtime_osyield (void)
{
  sched_yield ();
}

sched_yield参照[linux manual page]](http://www.man7.org/linux/man-pages/man2/sched_yield.2.html). 按照描述, 调用sched_yield之后, 会放弃cpu, 当前线程会被移动到静态优先级的队列的末尾. 需要注意的, 如果当前线程是唯一一个在最高优先级列表的, 那么, 在调用sched_yield()之后, 仍然会运行。

sema

lock的时候使用 semasleep, unlock的时候使用 semawakeup 通知其他m. 这里使用的是操作系统的 semasleep(-1) 和 semawakeup. 这里使用 mOS.waitsema 的指针 作为 semaphore字段.

平台创建

  1. semaphore 这里以 solaris平台为例子. 方法如下:
func semacreate(mp *m) {
    if mp.mos.waitsema != 0 {
        return
    }

    var sem *semt

    // Call libc's malloc rather than malloc. This will
    // allocate space on the C heap. We can't call malloc
    // here because it could cause a deadlock.
    sem = (*semt)(libc_malloc(unsafe.Sizeof(*sem)))
    if sem_init(sem, 0, 0) != 0 {
        throw("sem_init")
    }
    mp.mos.waitsema = uintptr(unsafe.Pointer(sem))
}

这里,先是堆上创建创建了对象, 然后实例化semaphore对象. sem_init参照linux manual

参考:

  1. sema init
  2. sema wait
  3. sem_post