这里分析Broker的主要实现.

impl

  • 基于Netty实现网络层, ServerCnx.java 是网络层和Broker交互的媒介
  • Consumer: 消费者,
  • Producer: 生产者, 负责消息投递状态的管理,producer统计.
  • Topic: 负责管理 Producer&Subscription, 以及消息的存储
  • Subscription: 负责管理一组consumer. 支持 Exclusive/Failover/Shared 三种订阅模型, 每个订阅有自己的cursor和Dispatcher.
  • Dispatcher: 负责消息的分发以及消费者的管理[添加、删除]
  • ManagedCursor: cursor管理
  • ManagedLedger: 负责数据的存储, 主要是 消息数据 和 cursor数据

流程

  1. 消息的生产/消费 参照 生产流程分析消费流程分析;这里简单描述:

    Producer send message -> Network -> Netty Server -> Topic -> Subscription -> Dispacher -> Consumer -> Netty Server -> Network -> Consumer 
    
  2. 消息redelivery

    后面补充!

  3. 消息去重

    通过比较 message.sequenceId 和 内存中相应Producer的 lastSequenceIdPushed 的大小, 判定 消息是否发送过. 在 Producer 中, 消息是递增的. 无论是 Broker 还是 client Producer 的实现中, sequenceId 都只在内存中进行维护, 不进行存储.

  4. Cursor compaction

  5. Topic compaction

  6. 分层存储

  7. 注册和高可用

faq

  1. topic是按照什么规则分配给 broker的呢? 又是什么时候分配的呢?
  • 在topic创建的时候是不会分配的. 可以参照 TopicsImpl#createPartitionedTopic, 创建完topic, 会在zk上存储 /admin/partitioned-topics/namespace/domain/topicName -> partitions 的json数据, 可以通过脚本实现topic的创建
  • 除次之外, 在 PARTITIONED_METADATA 交互协议中, 如果topic不存在, 就作为不是partition的topic, 直接返回partition=0的响应, 不会拒绝请求, 也不会去创建topic.
  • 在Lookup交互协议的时候, 尝试获取 topic 的 own broker 地址的时候, 如果topic没有 owner broker, 会进行选举最低负载的broker. 方法可以参看 NamespaceService#findBrokerServiceUrl 和 #searchForCandidateBroker.
  • 值得注意的是, own broker不是根据topic粒度确定的, 而是以 NamespaceBundle 的资源粒度确定的, 多个topic在一个NamespaceBundle上, 作为整体调度, 比如broker负载过重的时候, 可以将部分NamespaceBundle迁移到其他低负载的broker上.
  1. xxx