介绍

fasthttp 是一个极致复用的http框架, 根据官网, 相比于 net/http, 有 10x性能的提升. fasthttp 也提供了 adapter, 适配到 net/http.

实现

fasthttp 的核心理念就是复用, 在review代码的时候, 发现有大量的 sync.Pool 的使用, 比如 Server的池化: ServerPool, func worker 以及通过第三方组件 “github.com/valyala/bytebufferpool” 实现 byteBuffer 的 syncPool, 实现对 []byte 的复用.

在fasthttp中, 有一个核心的设计: workerPool. workPool 中, 有一个关键组件: workerChan, workerChan 既是资源的基本单位, 也是任务处理的基本单位. 每个到来的请求, 都会分配到workerChan中, 由workerChan的goroutine进行处理.

数据结构

workerChan的数据结构 和 分配如下:

type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() {
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

代码中, Serve 是 workerPool 的服务入口, getCh 是 workerChan 的分配逻辑, 实现中, 有两级缓存: ready的chan队列 和 workerChanPool 缓存池, ready队列的设计是 后进先出的, 是为了实现 cpu cache hot. ready队列的 workerChan 都是有 相应的 goroutine 运行的, 实现了 goroutine 池的概念. workerChanPool 只是 sync.Pool 的池化, 没有相应的goroutine, 分配之后, 需要 go 启动一个函数.

clean

那么, workerChan 会不会一直增长呢? 比如, 流量突然增加, 然后随着流量逐渐下滑, goroutine是否也会下降呢?

workerPool在启动的时候, 是会定期执行 clean操作的, 将超过过期时间的空闲的goroutine进行关闭(通过发送nil实现). 如下

func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

    // Clean least recently used workers if they didn't serve connections
    // for more than maxIdleWorkerDuration.
    currentTime := time.Now()

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()

    // Notify obsolete workers to stop.
    // This notification must be outside the wp.lock, since ch.ch
    // may be blocking and may consume a lot of time if many workers
    // are located on non-local CPUs.
    tmp := *scratch
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

需要注意的是, 因为ready 是 后进先出的实现, 所以,

*scratch = append((*scratch)[:0], ready[:i]...)

就是 需要被删除的workerChan.

执行

workerPool 的 workerChan 的执行函数是 workerFunc, 为了保证处理完单个请求不退出 goroutine, workerChan 使用了 chan net.Conn + for … range 的实现, workerPool 的 chan是无缓冲chan, 保证了 在启动的时候, 异步 goroutine 在启动之后, sender 才能发送数据. 每次处理完一个连接, 就会返回到 ready队列, 等待下一个 连接的处理.

如果被关闭了, 那么就会被归还到 workerChanPool 中, 最终在下一次gc的时候被释放掉.