pulsar负载均衡

一、动态分配

pulsar本身是集群部署的,每个集群包含多个broker。当创建新的topic时,会根据各个broker的负载情况,选择一个相对空闲的broker节点负责新topic的消息发布和订阅。如果某个broker的负载过高或者broker挂掉,也会重新进行topic的分配。从而确保整个集群在一个相对均衡稳定的情况下运行。

整个分配过程是自动的,不需要外部干预。

而且,由于broker只负责接收和发送消息,不负责消息持久化,所有broker本身是无状态的,使得broker的管理和topic分配都变得相对简单。

二、bundle

topic是消息发布订阅的最小粒度,在某些情况下会被频繁的创建、销毁,所有如果按照topic的粒度进行负载均衡会增加难度。因此,pulsar选择在namespace和topic之间虚构一层bundle,然后把topic分配到各个bundle中,以bunlde为粒度进行负载均衡。

默认情况下,一个namespace创建后会分配4个bunlde,每个bundle负责一个数字区间范围(0 - 0xFFFFFFFF),当有新的topic创建后,会对topic取hash,然后根据hash值把topic分配到某一个bundle下。

# 当命名空间创建时没有指定 bundle 数量时,将使用这个默认的值。defaultNumberOfNamespaceBundles=4

每个bunlde会被分配到一个broker上,需要重新分配时,会对整个bunlde进行移动。

bunlde的设置方式:

# 创建ns2,设置bundle个数为16
./pulsar-admin namespaces create tenant_vv/ns2 --bundles 16
# 查看bundle个数
./pulsar-admin namespaces bundles tenant_vv/ns2{ "boundaries" : [ "0x00000000", "0x10000000", "0x20000000", "0x30000000", "0x40000000", "0x50000000", "0x60000000", "0x70000000", "0x80000000", "0x90000000", "0xa0000000", "0xb0000000", "0xc0000000", "0xd0000000", "0xe0000000", "0xf0000000", "0xffffffff" ], "numBundles" : 16}

三、bundle拆分

通过hash方式可以把topic分配到某一个bundle上,通过增加bundle个数可以尽可能的让topic分配到不同的bundle上,从而让整个负载结果相对均衡。但如果topic的hash都很接近,或者一个bundle上topic的消息量特别大, 就会造成这个bundle压力过大,从而也造成处理该bundle的broker压力过大。

所以,pulsar支持对bundle进行拆分,拆分后的bundle会被重新分配。可以在conf/broker.conf中设置bundle拆分策略,具体如下:

# 启用/禁用 自动拆分命名空间中的bundleloadBalancerAutoBundleSplitEnabled=true# 启用/禁用 自动卸载切分的bundleloadBalancerAutoUnloadSplitBundlesEnabled=true# bundle 中最大的主题数, 一旦超过这个值,将触发拆分操作。loadBalancerNamespaceBundleMaxTopics=1000# bundle 最大的session数量(生产 + 消费), 一旦超过这个值,将触发拆分操作。loadBalancerNamespaceBundleMaxSessions=1000# bundle 最大的msgRate(进+出)的值, 一旦超过这个值,将触发拆分操作。loadBalancerNamespaceBundleMaxMsgRate=30000# bundle 最大的带宽(进+出)的值, 一旦超过这个值,将触发拆分操作loadBalancerNamespaceBundleMaxBandwidthMbytes=100# 命名空间中最大的 bundle 数量 (用于自动拆分bundle时)loadBalancerNamespaceMaximumBundles=128

四、broker过载检测

broker会根据cpu、网卡流量和内存判断是否过载,如果broker过载了,则会卸载上边的一些bundle。关于自动卸载的配置项:

# 启用/禁用自动负载拆分
loadBalancerSheddingEnabled=true
# 负载切分时间间隔。Broker 定期检查,是否有一些流量需要从一些负载比较高的 broker,转移到负载较低的 broker 上。
loadBalancerSheddingIntervalMinutes=1
# 防止同一个主题,在同一个时间窗口,被多次迁移的时间间隔。
loadBalancerSheddingGracePeriodMinutes=30
# 使用阈值确定 broker 是否过载
loadBalancerBrokerOverloadedThresholdPercentage=85
# 覆盖自动获取的网卡最大速度
# 此选项在某些环境中是有用的(例如:EC2 VMs) ,因为Linux报告的网卡最大速度不是 broker 真实的值。
# 因为负载管理器是根据网络使用情况来判断 broker 是否负载,为了确保信息是正确的,你可以通过这个参数来指定正确的值。这个配置值可以是一个 double 类型的值(比如: 0.8)。
# 能够通过这个配置项在网卡带宽使用完之前,触发负载切分操作。
loadBalancerOverrideBrokerNicSpeedGbps=

五、相关代码处理逻辑

关于负载均衡的代码在com.apache.pulsar.broker.loadbalance包下。

1. PulsarService服务启动后,会初始化负载均衡的模块

protected void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
// 启动两个定时任务
loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
}
} else {
if (leaderElectionService != null) {
LOG.info("This broker is a follower. Current leader is {}",
leaderElectionService.getCurrentLeader());
}
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
loadResourceQuotaTask = null;
}
}
});
leaderElectionService.start();
}

2. 负载均衡管理器选择

pulsar提供了默认的负载均衡器,同时也支持自定义负载均衡策略。负载均衡管理模块LoadManager的初始化代码如下:

static LoadManager create(final PulsarService pulsar) {
try {
// 如果配置文件中指定了自定义的均衡管理器,则使用自定义的。
final ServiceConfiguration conf = pulsar.getConfiguration();
final Class<?> loadManagerClass = Class.forName(conf.getLoadManagerClassName());
// Assume there is a constructor with one argument of PulsarService.
final Object loadManagerInstance = loadManagerClass.newInstance();
if (loadManagerInstance instanceof LoadManager) {
final LoadManager casted = (LoadManager) loadManagerInstance;
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ModularLoadManager) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
}
} catch (Exception e) {
LOG.warn("Error when trying to create load manager: ", e);
}
// 使用内置的负载均衡器。
// If we failed to create a load manager, default to SimpleLoadManagerImpl.
return new SimpleLoadManagerImpl(pulsar);
}

可以看到,如果我们在配置文件中提供了自定义的加载管理器,则优先使用自定义的,否则使用默认的SimpleLoadManagerImpl类。

然后我们看下LoadManager提供的API:

/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
* namespace/ServiceUnit placement on machines/ResourceUnit. Each Concrete Load Manager will use different algorithms to
* generate this mapping.
*
* Concrete Load Manager is also return the least loaded broker that should own the new namespace.
*/
public interface LoadManager {
Logger LOG = LoggerFactory.getLogger(LoadManager.class);
String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
void start() throws PulsarServerException;
/**
* Is centralized decision making to assign a new bundle.
*/
boolean isCentralized();
/**
* Returns the Least Loaded Resource Unit decided by some algorithm or criteria which is implementation specific.
*/
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
/**
* Generate the load report.
*/
LoadManagerReport generateLoadReport() throws Exception;
/**
* Set flag to force load report update.
*/
void setLoadReportForceUpdateFlag();
/**
* Publish the current load report on ZK.
*/
void writeLoadReportOnZookeeper() throws Exception;
/**
* Publish the current load report on ZK, forced or not.
* By default rely on method writeLoadReportOnZookeeper().
*/
default void writeLoadReportOnZookeeper(boolean force) throws Exception {
writeLoadReportOnZookeeper();
}
/**
* Update namespace bundle resource quota on ZK.
*/
void writeResourceQuotasToZooKeeper() throws Exception;
/**
* Generate load balancing stats metrics.
*/
List<Metrics> getLoadBalancingMetrics();
/**
* Unload a candidate service unit to balance the load.
*/
void doLoadShedding();
/**
* Namespace bundle split.
*/
void doNamespaceBundleSplit() throws Exception;
/**
* Removes visibility of current broker from loadbalancer list so, other brokers can't redirect any request to this
* broker and this broker won't accept new connection requests.
*
* @throws Exception
*/
void disableBroker() throws Exception;
/**
* Get list of available brokers in cluster.
*
* @return
* @throws Exception
*/
Set<String> getAvailableBrokers() throws Exception;
void stop() throws PulsarServerException;
/**
* Initialize this LoadManager.
*
* @param pulsar
* The service to initialize this with.
*/
void initialize(PulsarService pulsar);
}

可以看到,LoadManager提供了关于加载状态汇报、保存状态到zk、卸载服务以平衡加载、bundle拆分、停止broker和获取可用broker等api,方便我们对整个集群的加载进行管理和控制。

我们自己实现的加载管理器,需要采集各个broker节点的状态,然后根据topic和bundle信息,平均分配bundle到不同的broker上,确保各个broker的负载相对均衡。

3. LoadSheddingTask(加载移除)

这个定时任务就是调用加载管理器的doLoadShedding方法,如下:

@Override
public void run() {
try {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
}
}

在SimpleLoadManagerImpl中,是通过遍历所有broker汇报的bundle状态,找到其中过载的bundle,然后卸载掉。

pulsar.getAdminClient().namespaces().unloadNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName));
);

4. LoadResourceQuotaUpdaterTask

这个定时任务就是调用资源管理器的writeResourceQuatosToZookeeper把资源使用情况写入zk,如下:

@Override
public void run() {
try {
this.loadManager.get().writeResourceQuotasToZooKeeper();
} catch (Exception e) {
LOG.warn("Error write resource quota", e);
}
}

六、待研究问题

自定义均衡管理器需要注意哪些内容。

如何管理topic和bundle的分配策略。

是否可实现一个topic分配到多个bundle上,多个bundle分配到不同的broker上,数据存储不变,这样实现通过多个broker读取一个topic的数据。

七、参考资料

https://pulsar.apache.org/docs/en/administration-load-balance/

https://pulsar.apache.org/docs/en/develop-load-manager/

Author: iMine
Link: https://imine141.github.io/2021/08/10/pulsar/pulsar%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.