这里分析Broker的主要实现.

impl

  • 基于Netty实现网络层, ServerCnx.java 是网络层和Broker交互的媒介
  • Consumer: 消费者,
  • Producer: 生产者, 负责消息投递状态的管理,producer统计.
  • Topic: 负责管理 Producer&Subscription, 以及消息的存储
  • Subscription: 负责管理一组consumer. 支持 Exclusive/Failover/Shared 三种订阅模型, 每个订阅有自己的cursor和Dispatcher.
  • Dispatcher: 负责消息的分发以及消费者的管理[添加、删除]
  • ManagedCursor: cursor管理
  • ManagedLedger: 负责数据的存储, 主要是 消息数据 和 cursor数据

流程

  1. 消息的生产/消费 参照 生产流程分析消费流程分析;这里简单描述:

    Producer send message -> Network -> Netty Server -> Topic -> Subscription -> Dispacher -> Consumer -> Netty Server -> Network -> Consumer 
    
  2. 消息redelivery

    后面补充!

  3. 消息去重

    通过比较 message.sequenceId 和 内存中相应Producer的 lastSequenceIdPushed 的大小, 判定 消息是否发送过. 在 Producer 中, 消息是递增的. 无论是 Broker 还是 client Producer 的实现中, sequenceId 都只在内存中进行维护, 不进行存储.

  4. Cursor compaction

  5. Topic compaction

  6. 分层存储

  7. 注册和高可用

faq

  1. topic是按照什么规则分配给 broker的呢? 又是什么时候分配的呢?
  2. 在topic创建的时候是不会分配的. 可以参照 TopicsImpl#createPartitionedTopic, 创建完topic, 会在zk上存储 /admin/partitioned-topics/namespace/domain/topicName -> partitions 的json数据, 可以通过脚本实现topic的创建
  3. 除次之外, 在 PARTITIONED_METADATA 交互协议中, 如果topic不存在, 就作为不是partition的topic, 直接返回partition=0的响应, 不会拒绝请求, 也不会去创建topic.
  4. 在Lookup交互协议的时候, 尝试获取 topic 的 own broker 地址的时候, 如果topic没有 owner broker, 会进行选举最低负载的broker. 方法可以参看 NamespaceService#findBrokerServiceUrl 和 #searchForCandidateBroker.
  5. 值得注意的是, own broker不是根据topic粒度确定的, 而是以 NamespaceBundle 的资源粒度确定的, 多个topic在一个NamespaceBundle上, 作为整体调度, 比如broker负载过重的时候, 可以将部分NamespaceBundle迁移到其他低负载的broker上.
  6. xxx