基本概念

在pulsar中, 为了支持多租户, 有 tenant和namespace 概念. tenant和租户相关, 一个tenant一般是一个 team. namespace 可以区别不同的项目, 一个项目可以有多个topic. 在实现的过程中, 为了更好的支持资源调度的概念, 使用 NamespaceBundles 对象, NamespaceBundles 中有多个 NamespaceBundle, 根据负载, NamespaceBundle 会进行迁移和split. topic在资源上跟着bundle走. 查看NamespaceBundles定义:

public class NamespaceBundles {
    private final NamespaceName nsname;
    private final ArrayList<NamespaceBundle> bundles;
    private final NamespaceBundleFactory factory;
    private final long version;

    protected final long[] partitions;

    public static final Long FULL_LOWER_BOUND = 0x00000000L;
    public static final Long FULL_UPPER_BOUND = 0xffffffffL;
    private final NamespaceBundle fullBundle;

每个bundle负责一段range. 一开始的时候, 只有一个bound, range是 [0x00000000L,0xffffffffL], 后面随着负载过高, 会进行split, 拆分成多个bundle, 每个bundle由一个broker负责, 一个broker可能负责多个bundle. 当topic创建的时候, 会根据算法选择 bundle, 落地到bundle所在的broker上,
那么, 使用range有什么好处呢? 通过bundle抽象topic的资源概念, 可以更好的进行调度. 使用range的方式, 更容易扩容. 但是, 目前的实现上,没办法缩容.

分析NamespaceBundles的代码的时候, 重点关注下面几个流程:

bundle元数据

元数据存储在zk上, path: /namespace/NamespaceName/lowKey_highKey, 存储的value对象是 NamespaceEphemeralData, 也就是broker的信息.

bundle创建

刚创建服务的时候, 调用NamespaceService#registerBootstrapNamespaces, 会先注册 heartbeatNamespace, 在尝试注册其他Namespace, 每次获取到namespace 的 own, 就需要加载namespace下所有的topic.
一个Namespace只有一个owner

bundle split

split流程参考 NamespaceService#splitAndOwnBundle 的注释

/**
     * 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update
     * policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache.
     *
     * It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry "retryTimes".
*/

split的调用路径:

  1. 手动触发split admin url path:/{tenant}/{namespace}/{bundle}/split 调用路径:
Namespaces#splitNamespaceBundle
-> NamespaceBase#internalSplitNamespaceBundle -> NamespaceService#splitAndOwnBundle -> #splitAndOwnBundleOnceAndRetry 

实现上, 就是将 新分裂的 NamespaceBundle 分派给 当前的broker.

选主

整个服务只有一个leader broker, 所有tenant只有一个leaderBroker.

pulsar broker启动的时候, 会先竞争 namespace owner, 竞选通过zk临时节点路径 /loadbalance/leader 实现. 竞争成功/失败则获取成功之后, 记录leader broker信息, 触发监听器, 进行 LoadManager#doLoadShedding 和 LoadManager#writeResourceQuotasToZooKeeper, 实现负载动态迁移. 启动参照 PulsarService#start,

public void start() throws PulsarServerException {
    ...
    this.loadManager.set(LoadManager.create(this));
    startLeaderElectionService();
    ....
}

竞选参照 PulsarService#startLeaderElectionService 和 LeaderElectionService#start.
其中, 选主后使用的功能:

  • LoadManager#doLoadShedding: 找到高负载的broker, 将broker中的bundle迁移到 Namespace中其他的broker, 然后 http服务调用 admin路径 {tenant}/{namespace}/{bundle}/unload 实现.
  • LoadManager#writeResourceQuotasToZooKeeper: 将 namespace bundle的resource quotas写入到ZooKeeper路径: /loadbalance/resource-quota/namespace/NamespaceBundleName

补充admin的处理: url路径实现是 Namespaces#unloadNamespaceBundle, 调用链

Namespaces#unloadNamespaceBundle -> NamespaceBase#internalUnloadNamespaceBundle -> NamespaceService#unloadNamespaceBundle -> #unloadNamespaceBundle -> OwnedBundle#handleUnloadRequest -> OwnershipCache#removeOwnership

最终通过删除zk节点 /namespace/namespaceName/lowKey_highKey 实现

补充 admin broker是怎么选取出来的呢? 就是当前broker的地址.

补充 resource-quota 是怎么计算出来的呢? 计算的触发流程:

ZooKeeperChildrenCache#reloadCache/ZooKeeperDataCache#reloadCache -> SimpleLoadManangerImpl#onUpdate -> #updateRanking -> #updateRealtimeResourceQuota

计算的算法参考: SimpleLoadManangerImpl#updateRealtimeResourceQuota

bundle迁移

参照选主流程: LoadManager#doLoadShedding, 计算高负载的时候, 只会删除bundle的owner在zk节点上的信息. 当bundle不存在的时候, ????

topic查找

每个broker都会创建 OwnershipCache, 通过zk的交互, 获取 brokers上own的NamespaceBundle. 查看注释信息

/**
 * This class provides a cache service for all the service unit ownership among the brokers. It provide a cache service
 * as well as ZooKeeper read/write functions for a) lookup of a service unit ownership to a broker; b) take ownership of
 * a service unit by the local broker
 *
 *
 */

在实现上, 将自己的信息注册到zk的临时节点, 当broker挂了, own的信息就没有了, 那么, bundle就可以迁移了.

private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader<String, OwnedBundle> {

    public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
        this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
        this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
        this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
                pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
        this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
                pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
        this.bundleFactory = bundleFactory;
        this.localZkCache = pulsar.getLocalZkCache();
        this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
        // ownedBundlesCache contains all namespaces that are owned by the local broker
        this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor())
                .buildAsync(new OwnedServiceUnitCacheLoader());
    }

    @SuppressWarnings("deprecation")
    @Override
    public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Executor executor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Acquiring zk lock on namespace {}", namespaceBundleZNode);
        }

        byte[] znodeContent;
        try {
            znodeContent = jsonMapper.writeValueAsBytes(selfOwnerInfo);
        } catch (JsonProcessingException e) {
            // Failed to serialize to JSON
            return FutureUtil.failedFuture(e);
        }

        CompletableFuture<OwnedBundle> future = new CompletableFuture<>();
        ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent,
                Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
                    if (rc == KeeperException.Code.OK.intValue()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode);
                        }
                        ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
                        // ??
                        future.complete(new OwnedBundle(
                                ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory)));
                    } else {
                        // Failed to acquire lock
                        future.completeExceptionally(KeeperException.create(rc));
                    }
                }, null);

        return future;
    }
}

在上面的实现中, 有两个有趣的缓存对象:

  • ownershipReadOnlyCache: 查看bundle的owner信息.
  • ownedBundlesCache: 自己的 bundle own信息, 本地缓存. 两个缓存都是基于zk实现的, ownershipReadOnlyCache 是普通的实现, ownedBundlesCache做了定制化, 在asyncLoad函数中, 用自己的broker信息创建临时节点, 实现namespace self owned 的功能.

topic的创建、落地broker

  • topic创建的时候是不会绑定到NamespaceBundle上的, 会在zk上存储 /admin/partitioned-topics/namespace/domain/topicName -> partitions 的json数据.
  • 在Lookup交互协议的时候, 尝试获取 topic 的 own broker 地址的时候, 如果topic没有 owner broker, 会进行选举最低负载的broker. 方法可以参看 NamespaceService#findBrokerServiceUrl 和 NamespaceService#searchForCandidateBroker.

在方法 NamespaceService#findBrokerServiceUrl 上, 会先选择topic要分配的bundle. 分配算法如下:

// topic的hash算法在初始化NamespaceService的时候确定的, 目前是写死的crc32.
public NamespaceService(PulsarService pulsar) {
    this.pulsar = pulsar;
    host = pulsar.getAdvertisedAddress();
    this.config = pulsar.getConfiguration();
    this.loadManager = pulsar.getLoadManager();
    ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
    this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); // 这里竟然分配了
    this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
    this.namespaceClients = new ConcurrentOpenHashMap<>();
}

// 分配Bundle的算法 
protected NamespaceBundle getBundle(long hash) {
    int idx = Arrays.binarySearch(partitions, hash);
    int lowerIdx = idx < 0 ? -(idx + 2) : idx;
    return bundles.get(lowerIdx);
}

在方法 NamespaceService#searchForCandidateBroker 上, 会先选择 candidateBroker, 然后将结果返回. candidateBroker的选择有如下几种:

  1. Heartbeat or SLAMonitor namespace, 直接获取信息中broker地址
  2. (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()); 会选择最低负载的broker.
  3. 如果2不满足, 也就是 this.loadManager.get().isCentralized() &&!pulsar.getLeaderElectionService().isLeader(), 如果是 authoritative mode, 就选择当前的broker, 不然选取 leader broker.
  4. 如果是 选择了当前的broker, 会尝试通过zookkeeper进行own bundle. 成功own的情况下, 会获取Namespace下所有的topic, 过滤获取 当前bundle的所有topic, 在本地创建信息, 建立ManagedLedger信息.
  5. 如果没有选择当前的broker, 就会通过 zk 获取响应的broker信息. done

broker挂了

前面说到, bundle 是在一个broker, broker通过注册临时节点, 通知了其他broker. 所以, 当broker挂掉的时候, 临时节点就会注销, 那么可以根据这个实现 bundle迁移, 但是, 是怎么实现的呢?

broker怎么办?

producer/consumer启动的时候, 有两种方式连接到 LOOKUP 服务, 一种是直连到 pulsar broker, 另一种是连接到 http服务上, 如果是前面一种方式, broker挂了, 整个服务就不可用了, 而且如果使用第一种方式, 一开始创建的所有topic的bundle都会在一个broker上. 如果是http服务, 那么, 如果服务后面有多个broker, 一个broker挂了的情况, 后面的topic请求过来的时候, 根据topic创建的流程走, 会落在当前的broker上.

faq

一个问题, authoritative 改怎么设置呢? loadManager.get().isCentralized()? 为什么这么设计?

总结

  1. 整个服务只有一个 leader broker, 负责进行资源统计、bundle迁移.
  2. NamespaceBundle对资源进行了抽象, NamespaceBundle负责多个topic, 一个Namespace下有多个NamespaceBundle
  3. topic创建的时候, 会按照字符串hash + 取余的操作将topic放在一个NamespaceBundle中,方便后面资源调度
  4. bundle没有merge操作
  5. broker如果挂了, bundle会由当前Lookup服务的broker负责当前bundle