kafka 支持两种模式: standalone 和 distributed. 分开来讲.

组件

kafka-connect 主要的组件如下

  • Herder: 管理 worker、statusBackingStore、configBackingStore、offsetBackingStore.
  • Connect: kafka-connect 组件的生命周期的管理
  • statusBackingStore: task/worker 状态变化, 都需要调用这个
  • offsetBackingStore: 管理offset
  • configBackingStore: 管理配置
  • ConfigProvider: 管理配置信息

standalone

standalone 的实现比较简单, 这里简单说下 生命周期.

生命周期:

流程: 
ConnectStandalone#main -> StandaloneHerder#putConnectorConfig -> StandaloneHerder#startConnector -> 启动 connector + 启动 task 

启动 connector:
     Worker#startConnector -> WorkerConnector#start (管理connector的生命周期) -> Connector#start
启动task:
    Worer#startTask -> WorkerTask#run...execute (executorService执行)

StatusBackingStore: MemoryStatusBackingStore 
MemoryConfigBackingStore: MemoryConfigBackingStore

standalone的依赖 XXXBackingStore 都是 memory 的实现, 不赘述.

学习

kafka-connect 很喜欢用 callback 参数, 将结果callback 出去, 一定程度上能够增强 并发度

分布式

kafka 的分布式 和 kafka standalone, 除了在 storage 上的使用不同(KafkaConfigBackingStore、KafkaStatusBackingStore、KafkaOffsetBackingStore), 还使用了 group member protocol 管理分布式的worker, connector/task 的任务分配 通过 group member protocol 管理. 为了避免单个worker挂掉导致任务的重新分配, 使用 IncrementalCooperativeAssignor 管理任务的分配, 在这个实现中, 如果 单个worker 挂了, 这个worker的任务就会移动到其他worker中, 而不是无脑的分配所有的worker.

关于任务分配的主要类:

  • DistributedHerder: 在 joinPrepare, 暂停所有 connector/task; joinComplete, 记录结果
  • WorkerCoordinator: group member 协议处理 worker member 的加入 退出. metadata 中存放了 分配情况: ExtendedAssignment. poll 行为不了解…… client.poll 是为了触发 rebalance?
  • ExtendedAssignment: 除了包含 分配的 connector 和 tasks, 还有 revoked 的 connector 和 tasks.
  • IncrementalCooperativeAssignor的学习, 需要理解下 kip: https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect. 先分配 connector, 再分配 tasks, 而不是 connector分配一个, 在分配 connector 的task, 避免 单task的connector情况 导致 奇偶worker 分配不均匀
  • KafkaConfigBackingStore: 这个存储需要处理 compacted 的复杂情况.配置都是先 producer 发送消息, consumer 拉取配置后才放到本地的, 整体配置信息进行了异步化. 除此之外, 通过 offset 信息, 表示当前的 kafka-connect worker 的状态. 所以在 ConnectProtocol#WorkerState 中, 使用了 offset 字段作为状态信息. 在 group member protocol 管理的任务分配环节, 需要验证 leader 的 KafkaConfigBackingStore 中的 offset 和 成员中最大的 offset 的关系, 需要确保 leader offset 是最大的, 否则不能进行分配.
  • KafkaOffsetBackingStore: 同上
  • KafkaStatusBackingStore: 同上

问题

  • 新加入的 connector是怎么分配的呢?

DistributedHerder 会定时调用tick, tick中有一个步骤: handleRebalanceCompleted; 每次rebalance结束, 就会启动新增的 assignment(task/connector). 在调用 handleRebalanceCompleted 需要确保 自己在 group member protocol 活着, 通过 #poll 确定.

  • ExtendedAssignment 中 revokedConnectorIds revokedTaskIds 的设计眼里不了解
  • WorkerState 中 url 和 offset; offset 是 configBackingStore 中的offset, 在分布式中, 就是 KafkaConfigBackingStore 的 offset (consumer消费到的offset), 表示当前 worker 最新的状态; url 是 Header的, 每个集群一个 Header, 对应的 url 是 source的.
  • AbstractCoordinator: group memeber protocol 的实现骨架. 找到一个Node -> find coordinator protocol -> onJoinPrepare(子类) -> join group protocol -> sync group protocol(onJoinLeader 包含了任务分配的结果/follower 空的assignment) -> enable heartbeat ->onJoinComplete(子类处理分配结果). 心跳线程处理 coordinator的网络连接. leader 是 coordinator 选举的. 注意, 在配置的时候, 每个source集群需要配置一个不一样的 groupId.

疑惑

group member protocol 的kip 没找到, 很疑惑

Isolation:

类隔离的本质就是 通过不同的classloader 来隔离不同的实现. 在 kafka-connect 中, Source/Sink/Connector/Converter/Transformation/ConfigProvider/RestConnection/ConnectorClientOverridePolicy 都是插件, 允许用户自定义实现, kafka-connect通过加载用户指定的路径信息加载这些插件. 需要注意的是, 用户的指定路径可以是多个目录; 其次, 这些插件不希望彼此的依赖相互影响.

在看实现的时候, 有两个 classLoader:

  1. PluginClassLoader: 每一个配置的 jar文件/.class文件 都是一个 类加载器, 实现隔离机制. 在需要隔离的类(不在黑名单或者在白名单), 用 UrlClassLoader 进行加载, 否则用 父类 ClassLoader 进行加载 (在实现的时候, 有些调用来自connect"主路径”, 有些来自connector/task, 后者肯定已经在类隔离机制中, 所以, 在调用实例化 converter 等对象的时候, 使用当前 classloader 就可以了)
  2. DelegatingClassLoader: 维护扫描到的用户添加的类信息

在扫描用户类路径的时候, 有两种扫描的方式:

  1. reflection 获取具体的类: Connector/Converter/HeaderConverter/Transformation: 用户多种自定义实现. 一个 kafka-connect 有多个实现.
  2. 利用 serviceLoader 获取 具体的实现类: ConfigProvider/ConnectRestExtension/ConnectorClientConfigOverridePolicy: 一个 kafka-connect 就一个.

学习

  1. org.reflection 包不错. 可以从 url 路径列表下 获取 类信息

  2. 在实现类隔离机制的时候, 通过 ClassLoader#ParallelLoaders 方法扩大并行度. 在类加载过程中, 如果是 允许并行的话, 锁住的是 每个类名字; 否则的话, 锁住的是 ClassLoader 当前实现类对象.

rest

servlet 设计. 暂时不赘述.

疑惑