4 minutes
Pulsar Bundle
基本概念
在pulsar中, 为了支持多租户, 有 tenant和namespace 概念. tenant和租户相关, 一个tenant一般是一个 team. namespace 可以区别不同的项目, 一个项目可以有多个topic. 在实现的过程中, 为了更好的支持资源调度的概念, 使用 NamespaceBundles 对象, NamespaceBundles 中有多个 NamespaceBundle, 根据负载, NamespaceBundle 会进行迁移和split. topic在资源上跟着bundle走. 查看NamespaceBundles定义:
public class NamespaceBundles {
private final NamespaceName nsname;
private final ArrayList<NamespaceBundle> bundles;
private final NamespaceBundleFactory factory;
private final long version;
protected final long[] partitions;
public static final Long FULL_LOWER_BOUND = 0x00000000L;
public static final Long FULL_UPPER_BOUND = 0xffffffffL;
private final NamespaceBundle fullBundle;
每个bundle负责一段range. 一开始的时候, 只有一个bound, range是 [0x00000000L,0xffffffffL], 后面随着负载过高, 会进行split, 拆分成多个bundle, 每个bundle由一个broker负责, 一个broker可能负责多个bundle. 当topic创建的时候, 会根据算法选择 bundle, 落地到bundle所在的broker上,
那么, 使用range有什么好处呢?
通过bundle抽象topic的资源概念, 可以更好的进行调度. 使用range的方式, 更容易扩容. 但是, 目前的实现上,没办法缩容.
分析NamespaceBundles的代码的时候, 重点关注下面几个流程:
bundle元数据
元数据存储在zk上, path: /namespace/NamespaceName/lowKey_highKey, 存储的value对象是 NamespaceEphemeralData, 也就是broker的信息.
bundle创建
刚创建服务的时候, 调用NamespaceService#registerBootstrapNamespaces, 会先注册 heartbeatNamespace, 在尝试注册其他Namespace, 每次获取到namespace 的 own, 就需要加载namespace下所有的topic.
一个Namespace只有一个owner
bundle split
split流程参考 NamespaceService#splitAndOwnBundle 的注释
/**
* 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update
* policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache.
*
* It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry "retryTimes".
*/
split的调用路径:
- 手动触发split admin url path:/{tenant}/{namespace}/{bundle}/split 调用路径:
Namespaces#splitNamespaceBundle
-> NamespaceBase#internalSplitNamespaceBundle -> NamespaceService#splitAndOwnBundle -> #splitAndOwnBundleOnceAndRetry
实现上, 就是将 新分裂的 NamespaceBundle 分派给 当前的broker.
选主
整个服务只有一个leader broker, 所有tenant只有一个leaderBroker.
pulsar broker启动的时候, 会先竞争 namespace owner, 竞选通过zk临时节点路径 /loadbalance/leader 实现. 竞争成功/失败则获取成功之后, 记录leader broker信息, 触发监听器, 进行 LoadManager#doLoadShedding 和 LoadManager#writeResourceQuotasToZooKeeper, 实现负载动态迁移. 启动参照 PulsarService#start,
public void start() throws PulsarServerException {
...
this.loadManager.set(LoadManager.create(this));
startLeaderElectionService();
....
}
竞选参照 PulsarService#startLeaderElectionService 和 LeaderElectionService#start.
其中, 选主后使用的功能:
- LoadManager#doLoadShedding: 找到高负载的broker, 将broker中的bundle迁移到 Namespace中其他的broker, 然后 http服务调用 admin路径 {tenant}/{namespace}/{bundle}/unload 实现.
- LoadManager#writeResourceQuotasToZooKeeper: 将 namespace bundle的resource quotas写入到ZooKeeper路径: /loadbalance/resource-quota/namespace/NamespaceBundleName
补充admin的处理: url路径实现是 Namespaces#unloadNamespaceBundle, 调用链
Namespaces#unloadNamespaceBundle -> NamespaceBase#internalUnloadNamespaceBundle -> NamespaceService#unloadNamespaceBundle -> #unloadNamespaceBundle -> OwnedBundle#handleUnloadRequest -> OwnershipCache#removeOwnership
最终通过删除zk节点 /namespace/namespaceName/lowKey_highKey 实现
补充 admin broker是怎么选取出来的呢? 就是当前broker的地址.
补充 resource-quota 是怎么计算出来的呢? 计算的触发流程:
ZooKeeperChildrenCache#reloadCache/ZooKeeperDataCache#reloadCache -> SimpleLoadManangerImpl#onUpdate -> #updateRanking -> #updateRealtimeResourceQuota
计算的算法参考: SimpleLoadManangerImpl#updateRealtimeResourceQuota
bundle迁移
参照选主流程: LoadManager#doLoadShedding, 计算高负载的时候, 只会删除bundle的owner在zk节点上的信息. 当bundle不存在的时候, ????
topic查找
每个broker都会创建 OwnershipCache, 通过zk的交互, 获取 brokers上own的NamespaceBundle. 查看注释信息
/**
* This class provides a cache service for all the service unit ownership among the brokers. It provide a cache service
* as well as ZooKeeper read/write functions for a) lookup of a service unit ownership to a broker; b) take ownership of
* a service unit by the local broker
*
*
*/
在实现上, 将自己的信息注册到zk的临时节点, 当broker挂了, own的信息就没有了, 那么, bundle就可以迁移了.
private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader<String, OwnedBundle> {
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), false);
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), true);
this.bundleFactory = bundleFactory;
this.localZkCache = pulsar.getLocalZkCache();
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
// ownedBundlesCache contains all namespaces that are owned by the local broker
this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor())
.buildAsync(new OwnedServiceUnitCacheLoader());
}
@SuppressWarnings("deprecation")
@Override
public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Executor executor) {
if (LOG.isDebugEnabled()) {
LOG.debug("Acquiring zk lock on namespace {}", namespaceBundleZNode);
}
byte[] znodeContent;
try {
znodeContent = jsonMapper.writeValueAsBytes(selfOwnerInfo);
} catch (JsonProcessingException e) {
// Failed to serialize to JSON
return FutureUtil.failedFuture(e);
}
CompletableFuture<OwnedBundle> future = new CompletableFuture<>();
ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode);
}
ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
// ??
future.complete(new OwnedBundle(
ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory)));
} else {
// Failed to acquire lock
future.completeExceptionally(KeeperException.create(rc));
}
}, null);
return future;
}
}
在上面的实现中, 有两个有趣的缓存对象:
- ownershipReadOnlyCache: 查看bundle的owner信息.
- ownedBundlesCache: 自己的 bundle own信息, 本地缓存. 两个缓存都是基于zk实现的, ownershipReadOnlyCache 是普通的实现, ownedBundlesCache做了定制化, 在asyncLoad函数中, 用自己的broker信息创建临时节点, 实现namespace self owned 的功能.
topic的创建、落地broker
- topic创建的时候是不会绑定到NamespaceBundle上的, 会在zk上存储 /admin/partitioned-topics/namespace/domain/topicName -> partitions 的json数据.
- 在Lookup交互协议的时候, 尝试获取 topic 的 own broker 地址的时候, 如果topic没有 owner broker, 会进行选举最低负载的broker. 方法可以参看 NamespaceService#findBrokerServiceUrl 和 NamespaceService#searchForCandidateBroker.
在方法 NamespaceService#findBrokerServiceUrl 上, 会先选择topic要分配的bundle. 分配算法如下:
// topic的hash算法在初始化NamespaceService的时候确定的, 目前是写死的crc32.
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); // 这里竟然分配了
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
this.namespaceClients = new ConcurrentOpenHashMap<>();
}
// 分配Bundle的算法
protected NamespaceBundle getBundle(long hash) {
int idx = Arrays.binarySearch(partitions, hash);
int lowerIdx = idx < 0 ? -(idx + 2) : idx;
return bundles.get(lowerIdx);
}
在方法 NamespaceService#searchForCandidateBroker 上, 会先选择 candidateBroker, 然后将结果返回. candidateBroker的选择有如下几种:
- Heartbeat or SLAMonitor namespace, 直接获取信息中broker地址
- (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()); 会选择最低负载的broker.
- 如果2不满足, 也就是 this.loadManager.get().isCentralized() &&!pulsar.getLeaderElectionService().isLeader(), 如果是 authoritative mode, 就选择当前的broker, 不然选取 leader broker.
- 如果是 选择了当前的broker, 会尝试通过zookkeeper进行own bundle. 成功own的情况下, 会获取Namespace下所有的topic, 过滤获取 当前bundle的所有topic, 在本地创建信息, 建立ManagedLedger信息.
- 如果没有选择当前的broker, 就会通过 zk 获取响应的broker信息. done
broker挂了
前面说到, bundle 是在一个broker, broker通过注册临时节点, 通知了其他broker. 所以, 当broker挂掉的时候, 临时节点就会注销, 那么可以根据这个实现 bundle迁移, 但是, 是怎么实现的呢?
broker怎么办?
producer/consumer启动的时候, 有两种方式连接到 LOOKUP 服务, 一种是直连到 pulsar broker, 另一种是连接到 http服务上, 如果是前面一种方式, broker挂了, 整个服务就不可用了, 而且如果使用第一种方式, 一开始创建的所有topic的bundle都会在一个broker上. 如果是http服务, 那么, 如果服务后面有多个broker, 一个broker挂了的情况, 后面的topic请求过来的时候, 根据topic创建的流程走, 会落在当前的broker上.
faq
一个问题, authoritative 改怎么设置呢? loadManager.get().isCentralized()? 为什么这么设计?
总结
- 整个服务只有一个 leader broker, 负责进行资源统计、bundle迁移.
- NamespaceBundle对资源进行了抽象, NamespaceBundle负责多个topic, 一个Namespace下有多个NamespaceBundle
- topic创建的时候, 会按照字符串hash + 取余的操作将topic放在一个NamespaceBundle中,方便后面资源调度
- bundle没有merge操作
- broker如果挂了, bundle会由当前Lookup服务的broker负责当前bundle
720 Words
2019-03-29 23:11 +0800