Preface

最近发现sarama的收发延迟很高, 内部研究发现 sarama抽象的broker交互 底层竟然是同步阻塞的调用: sendAndReceive, 比如:

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
	var response *ProduceResponse
	var err error

	if request.RequiredAcks == NoResponse {
		err = b.sendAndReceive(request, nil)
	} else {
		response = new(ProduceResponse)
		err = b.sendAndReceive(request, response)  // 同步阻塞了
	}

	if err != nil {
		return nil, err
	}

	return response, nil
}

这里的broker对象就是 sarma 对 kafka broker 连接的抽象, 从上面可以发现, 对于每个生产请求, 都是顺序发送, 并且下一个请求必须等待上各个请求接收到相应 才能发送. 于是直观的想法就是, 异步的send 和 receive, 也就是说下一个请求并不需要等待上一个请求收到响应.

深入

使用的版本的最新commit是: 4b03d67e106c6d3b9dd465a308a62b400ead70a4 , 时间是: Sat Mar 7 08:46:38 2020 -0800

处理流程

但是这里涉及到一个条件, 就是服务端处理的并发性/有序性. 如果客户端顺序连续发送多条消息, 服务端怎么处理呢? 服务端会保证有序性呢?

这里我们看下 kafka 的设计, 其实, kafka 是一个socketServer, socketServer 将接受的请求发送一个大的RequestChannel, 然后多个线程并发的从RequestChannel中获取请求. 处理逻辑如下:

class KafkaRequestHandler {
  def run(): Unit = {
    while (!stopped) {
    	......
    	val req = requestChannel.receiveRequest(300)
    	req match {
    		......
    		case request: RequestChannel.Request =>
    		  try {
    		  	......
	            apis.handle(request)  // 调用 KafkaApis, 是真正的逻辑处理, 比如 处理发送消息的逻辑
	          } catch {
	            case e: FatalExitError =>
	              shutdownComplete.countDown()
	              Exit.exit(e.statusCode)
	            case e: Throwable => error("Exception when handling request", e)
	          } finally {
	            request.releaseBuffer()
	          }
    	}
    }
  }
}

class KafkaRequestHandlerPool {    // 启动handler的线程池
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
  ......
}

这里的 KafkaRequestHandler 是多个线程启动的, requestChannel 是多个线程共享的, 启动就是上面的 KafkaRequestHandlerPool, 那么哪里组装他们的呢?

class KafkaServer{
  def startup(): Unit = {

  	// 创建处理的主流程对象
  	dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager,   
  	    adminManager, groupCoordinator, transactionCoordinator,
          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
          fetchManager, brokerTopicStats, clusterId, time, tokenManager)
  	// 启动处理的线程池, 组合了dataPlaneRequestChannel 和 dataPlaneRequestProcessor
  	dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,   			 						 socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

  }
}

可以发现 KafkaServer 在启动的时候, 在创建 dataPlaneRequestHandlerPool 对象的时候, 就是 KafkaRequestHandlerPool, 就传递了共享的 dataPlaneRequestChannel 和 dataPlaneRequestProcessor , 这个名字更多的借鉴了 service mesh 的理念. 其中 dataPlaneRequestChannel 是在创建socketServer的时候初始化的, 并且大小可以在 config 中指定:

class SocketServer {
	val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)  // 对象创建就会立即初始化
}

因此, 将上面的内容串起来就是:

1. 网络层将请求投递到 RequestChannel 
2. 多线程的 KafkaRequestHandler 并发的从 RequestChannel 获取请求, 交给 kafkaApis 处理

问题

顺序性的保证

那么问题就来了, 对于一个连接上的多个请求, 比如顺序并连续的投递到同一个 kafkaServer 上, 对应的kafka线程处理则是 并发乱序的, 因此kafkaServer 并没有保证 请求的顺序性, 回顾 kafka client 的设计, kafka client 是顺序阻塞网络调用的, 因此 kafka 消息的顺序性 也就是 kafka client 保证的.

但是, 对于大部分kafka server, 真的那么需要顺序性吗?有时候仅仅希望迅速的将请求发送出去, 提高并发量. 对于日志场景, 对同一个 rootId, 希望在一个partition 里面, 可以通过这样的方式提高并发吞吐量, 对于短时间的无序, 日志场景本身就会有一段时间聚合的逻辑, 因此短时的无序 也会通过 这个时间窗口实现有序.