Kafka 的服务端 主要有以下几个部分组成:

  1. GroupCoordinator: 负责consumer的成员管理和offset管理, 一个集群有多个 GroupCoordinator, consumer group 根据 group names被分派到 consumer group 的一个partition, 作为这个consumer group的 coordinator.

  2. KafkaController: 负责监听 zk 处理 topic/parititon/broker 等信息, 一个集群只有一个 KafkaController

  3. SocketServer: 自定义的socketServer, 1 Acceptor & n Processor.

  4. KafkaApis: 负责处理相应的client&follower的请求和响应

  5. TimingWheel: 时间轮的实现, kafka broker中 DelayedJoin DelayedHeartbeat 等延迟等待的事件的实现

  6. Log: 日志层的管理, 包括多个 LogSegment

/**

  • An append-only log for storing messages.
  • The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
  • ……. */
  1. ReplicaStateMachine: 负责副本的管理. kafka Controller 竞选成为leader后, 会启动 ReplicaStateMachine

  2. PartitionStateMachine: 负责partition的管理

faq:

  1. 关于zero copy 是怎么回事? Kafka实现中, consumer fetch的时候, 不是copy 文件的内容在发送出去的, 而是 只是引用 ByteBuffer 对象发送 socket, 最终调用java的 FileChannelImpl#transferTo0 的native实现.
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jobject srcFDO,
                                            jlong position, jlong count,
                                            jobject dstFDO)
{
    jint srcFD = fdval(env, srcFDO);
    jint dstFD = fdval(env, dstFDO);

#if defined(__linux__)
    off64_t offset = (off64_t)position;
    jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);  // 关键
    if (n < 0) {
        if (errno == EAGAIN)
            return IOS_UNAVAILABLE;
        if ((errno == EINVAL) && ((ssize_t)count >= 0))
            return IOS_UNSUPPORTED_CASE;
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        }
        JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
        return IOS_THROWN;
    }
    return n;
#elif defined (__solaris__)
    .....

在linux系统中, 最终使用 sendfile64 指令