Preface

最近IO层面的深入比较多, 顺便研究下 Kafka 底层IO实现,

深入

直接看下 socketServer 的注释 学习下理论.

acceptor

/**
 * Handles new connections, requests and responses to and from broker.
 * Kafka supports two types of request planes :
 *  - data-plane :
 *    - Handles requests from clients and other brokers in the cluster.
 *    - The threading model is
 *      1 Acceptor thread per listener, that handles new connections.
 *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
 *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
 *  - control-plane :
 *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
 *      If not configured, the controller requests are handled by the data-plane.
 *    - The threading model is
 *      1 Acceptor thread that handles new connections
 *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
 *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
 */
class SocketServer {
	......
}

根据注释, Socketserver 无论是 数据面还是 控制面 的底层都是 Reactor IO模型, 每个listener有一个 NIOSelector 处理accept的连接, 对于建立的连接, 会分给 Processor 的线程处理, Processor 会将每个 新建的连接重新挂在到 processor 的 NIOSelector, 用来监听 读写事件.

这里存在一个问题, 新创建的socket 怎么决定分配给 哪个 processor? 可以看下 run 方法, 其实也是 round-robin 实现.

private[kafka] class Acceptor {

  /**
   * Accept loop that checks for new connection attempts
   */
  def run(): Unit = {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()

                if (key.isAcceptable) {
                  accept(key).foreach { socketChannel =>
                    // Assign the channel to the next processor (using round-robin) to which the
                    // channel can be added without blocking. If newConnections queue is full on
                    // all processors, block until the last one is able to accept a connection.
                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      processor = synchronized {
                        // adjust the index (if necessary) and retrieve the processor atomically for
                        // correct behaviour in case the number of processors is reduced dynamically
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                    // round-robin 实现
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
      }
    }
}

这里的 Acceptor 就是 kafka 对 reactor 中 acceptor的抽象, 用来处理新创建的连接. 需要注意的是, 虽然是 roudn-robin, 但并不是简单的实现, 在 processor 模型中, processor 也是队列的模型, 每个请求也是先投递到队列中, 如果队列满了话, 就会尝试下一个, 但是如果下一个也满了的话, 会轮询下一个, 知道最后一个, 这个时候会一直堵塞. 可以看 processor 的accept 逻辑

/**
 * Thread that processes all requests from a single connection. There are N of these running in parallel
 * each of which has its own selector
 */
private[kafka] class Processor {
  /**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel,
             mayBlock: Boolean,
             acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
    val accepted = {
      if (newConnections.offer(socketChannel))
        true
      else if (mayBlock) {  // 只有最后一个才会 blocking
        val startNs = time.nanoseconds
        newConnections.put(socketChannel)
        acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)
        true
      } else
        false
    }
    if (accepted)
      wakeup()
    accepted
  }

Quota

broker IO 实现清晰了很多, 但是broker 如何自我保护呢? 比如 连接数太多 以及 client produce/consume 太多的问题?

connection quota

broker支持 连接数上限的设置. 当客户端连接数过多超过上限, broker 获取到这个连接之后, 就会等待quota, 并且是无限等待, 直到有其他连接关闭释放 quota. 可以参考 SocketServer.scala 中的 ConnectionQuotas 实现. 当 Acceptor#accept 获取到客户端连接的时候, 会调用 ConnectionQuotas#inc 尝试获取 quota, quota 内部使用 mutable.Map 实现, 这个数据结构除了map的常规用法, 还提供了 wait 和 notify 语意. 当连接关闭的时候, 会调用 ConnectionQuota#dec 释放quota, 调用 mutable.Map#notify 通知阻塞的连接等待.

在配置方面, 既支持 broker 最大连接数, 也支持每个listener的连接数配置

客户端 Quota

在 0.9 版本, kakfa 提供了这个功能, 0.10 版本添加了 zk 的配置.

client quota 是通过 限速实现的, 限制每秒的字节数, 并不会返回错误 (返回错误会导致客户端重试和退避实现, 显得复杂), 而是通过增加延迟响应 降低客户端的处理能力, 限速的实现基于 clientId 维度, clientId 通过写入zk的方式进行交互, 这样配置变更就不需要重启集群了. 默认10MB每秒. 因为基于 clientId 维度, 那么当 clientId 被设置成 空字符串的时候, kafka 又是怎么处理的呢? kafka 内部会让 这些空 clientId 的client共享 quota. 需要注意的是, 这个quota设置是每个broker的, 而不是总的broker的quota, 当前的设计更多的是保护了单个broker.

  • 实现:
prdocuer: 通过将响应放到 `DelayedOperationPurgatory`, 注意, 这个时候其实已经写入磁盘了(避免内存占用). 根据前面的 sarama 发送有序的讨论, 这样可以显著降低客户端的请求qps

consumer: 类似producer, 但是是在拉取消息之前进入 `DelayedOperationPurgatory`(避免内存占用)

延迟时间计算: Delay Time = (overall produced in window - quotabound)/Quota limit per second

一秒的窗口 越小越好,但是建议10

客户端quota主要参考: QuotaFactory.scala 和 ClientQuotaManager.scala, 因为需要延迟响应, 会将请求放到 delayQueue 做延迟处理, 延迟到期的处理参考 ThrottledChannelReaper

调用路径:

KafkaApis# handleProduceRequest 内部方法 sendResponseCallback -> ClientQuotaManager#maybeRecordAndGetThrottleTimeMs -> KafkaApis# handleProduceRequest 内部方法 sendResponseCallback -> ClientQuotaManager#throttle -> ThrottledChannelReaper 处理超时到期的消息, 执行回调 -> KafkaApis#sendResponse 

一些特殊的数据结构

metrics & sensor & quota

kafka metrics 的设计有些绕. KafkaMetric 主要抽象成两类:

  • MetricValueProvider 获取数据读取,有两种实现, 自定义的 Measurable 或者 监控的 Gauge 对象. Measurable 用于各种自定义实现, 比如 Metadata的更新间隔, 除此之外, 还有 stat 扩展实现了 Measurable, 比如 max, min, avg, rate 等常用指标 (对应 SampledStat 的几种实现)
  • kafkaMetric 是暴露的统一数据接口, 除了提供value()方法之外, 内部存储了 MetricConfig, 可以用于quota

举个例子: Sender$SenderMetrics.java

除了metrics, kafka 还提供了 sensor 的抽象, 用来处理一段时间的数据, 并且支持 quota 的判断, sensor 内部组合了 stat 和 KafkaMetric. 但是我们上面提到, stat 本质还是 KafkaMetric对象, 为什么 sensor 要单独维护两个呢? 我们上面谈到, KafkaMetrics 的指标数据是 MetricValueProvider 提供的, 本身并没有更新和记录的能力, 因此在调用 sensor#record 更新指标数据的时候, 需要 stat 进行更新, 进而更新到 KafkaMetric, sensor中的 KafkaMetrics 仅仅用来检查 是否超过 Quota, 如果超过, 就抛出异常

另外, 对于指定的一个指标, 可能有多种统计维度, 这个就需要抽象一个 Sensor 组合多个 KafkaMetric

一般情况下, KafkaMetric 不会设置 quota, 大部分是空的, 比如上面提供的 Sender$SenderMetrics.java

上面讲到的 客户端Quota 就是基于 sensor实现的

参考

https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas https://docs.cloudera.com/runtime/7.1.0/kafka-configuring/topics/kafka-config-quotas.html http://kafka.apache.org/documentation/#design