One minute
Kafka Broker
Kafka 的服务端 主要有以下几个部分组成:
-
GroupCoordinator: 负责consumer的成员管理和offset管理, 一个集群有多个 GroupCoordinator, consumer group 根据 group names被分派到 consumer group 的一个partition, 作为这个consumer group的 coordinator.
-
KafkaController: 负责监听 zk 处理 topic/parititon/broker 等信息, 一个集群只有一个 KafkaController
-
SocketServer: 自定义的socketServer, 1 Acceptor & n Processor.
-
KafkaApis: 负责处理相应的client&follower的请求和响应
-
TimingWheel: 时间轮的实现, kafka broker中 DelayedJoin DelayedHeartbeat 等延迟等待的事件的实现
-
Log: 日志层的管理, 包括多个 LogSegment
/**
- An append-only log for storing messages.
- The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
- ……. */
-
ReplicaStateMachine: 负责副本的管理. kafka Controller 竞选成为leader后, 会启动 ReplicaStateMachine
-
PartitionStateMachine: 负责partition的管理
faq:
- 关于zero copy 是怎么回事? Kafka实现中, consumer fetch的时候, 不是copy 文件的内容在发送出去的, 而是 只是引用 ByteBuffer 对象发送 socket, 最终调用java的 FileChannelImpl#transferTo0 的native实现.
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
jobject srcFDO,
jlong position, jlong count,
jobject dstFDO)
{
jint srcFD = fdval(env, srcFDO);
jint dstFD = fdval(env, dstFDO);
#if defined(__linux__)
off64_t offset = (off64_t)position;
jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count); // 关键
if (n < 0) {
if (errno == EAGAIN)
return IOS_UNAVAILABLE;
if ((errno == EINVAL) && ((ssize_t)count >= 0))
return IOS_UNSUPPORTED_CASE;
if (errno == EINTR) {
return IOS_INTERRUPTED;
}
JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
return IOS_THROWN;
}
return n;
#elif defined (__solaris__)
.....
在linux系统中, 最终使用 sendfile64 指令
186 Words
2019-03-03 21:40 +0800