一、动态分配
pulsar本身是集群部署的,每个集群包含多个broker。当创建新的topic时,会根据各个broker的负载情况,选择一个相对空闲的broker节点负责新topic的消息发布和订阅。如果某个broker的负载过高或者broker挂掉,也会重新进行topic的分配。从而确保整个集群在一个相对均衡稳定的情况下运行。
整个分配过程是自动的,不需要外部干预。
而且,由于broker只负责接收和发送消息,不负责消息持久化,所有broker本身是无状态的,使得broker的管理和topic分配都变得相对简单。
二、bundle
topic是消息发布订阅的最小粒度,在某些情况下会被频繁的创建、销毁,所有如果按照topic的粒度进行负载均衡会增加难度。因此,pulsar选择在namespace和topic之间虚构一层bundle,然后把topic分配到各个bundle中,以bunlde为粒度进行负载均衡。
默认情况下,一个namespace创建后会分配4个bunlde,每个bundle负责一个数字区间范围(0 - 0xFFFFFFFF),当有新的topic创建后,会对topic取hash,然后根据hash值把topic分配到某一个bundle下。
# 当命名空间创建时没有指定 bundle 数量时,将使用这个默认的值。defaultNumberOfNamespaceBundles=4 |
每个bunlde会被分配到一个broker上,需要重新分配时,会对整个bunlde进行移动。
bunlde的设置方式:
# 创建ns2,设置bundle个数为16 |
三、bundle拆分
通过hash方式可以把topic分配到某一个bundle上,通过增加bundle个数可以尽可能的让topic分配到不同的bundle上,从而让整个负载结果相对均衡。但如果topic的hash都很接近,或者一个bundle上topic的消息量特别大, 就会造成这个bundle压力过大,从而也造成处理该bundle的broker压力过大。
所以,pulsar支持对bundle进行拆分,拆分后的bundle会被重新分配。可以在conf/broker.conf中设置bundle拆分策略,具体如下:
# 启用/禁用 自动拆分命名空间中的bundleloadBalancerAutoBundleSplitEnabled=true# 启用/禁用 自动卸载切分的bundleloadBalancerAutoUnloadSplitBundlesEnabled=true# bundle 中最大的主题数, 一旦超过这个值,将触发拆分操作。loadBalancerNamespaceBundleMaxTopics=1000# bundle 最大的session数量(生产 + 消费), 一旦超过这个值,将触发拆分操作。loadBalancerNamespaceBundleMaxSessions=1000# bundle 最大的msgRate(进+出)的值, 一旦超过这个值,将触发拆分操作。loadBalancerNamespaceBundleMaxMsgRate=30000# bundle 最大的带宽(进+出)的值, 一旦超过这个值,将触发拆分操作loadBalancerNamespaceBundleMaxBandwidthMbytes=100# 命名空间中最大的 bundle 数量 (用于自动拆分bundle时)loadBalancerNamespaceMaximumBundles=128 |
四、broker过载检测
broker会根据cpu、网卡流量和内存判断是否过载,如果broker过载了,则会卸载上边的一些bundle。关于自动卸载的配置项:
# 启用/禁用自动负载拆分 |
五、相关代码处理逻辑
关于负载均衡的代码在com.apache.pulsar.broker.loadbalance包下。
1. PulsarService服务启动后,会初始化负载均衡的模块
protected void startLeaderElectionService() { |
2. 负载均衡管理器选择
pulsar提供了默认的负载均衡器,同时也支持自定义负载均衡策略。负载均衡管理模块LoadManager的初始化代码如下:
static LoadManager create(final PulsarService pulsar) { |
可以看到,如果我们在配置文件中提供了自定义的加载管理器,则优先使用自定义的,否则使用默认的SimpleLoadManagerImpl类。
然后我们看下LoadManager提供的API:
/** |
可以看到,LoadManager提供了关于加载状态汇报、保存状态到zk、卸载服务以平衡加载、bundle拆分、停止broker和获取可用broker等api,方便我们对整个集群的加载进行管理和控制。
我们自己实现的加载管理器,需要采集各个broker节点的状态,然后根据topic和bundle信息,平均分配bundle到不同的broker上,确保各个broker的负载相对均衡。
3. LoadSheddingTask(加载移除)
这个定时任务就是调用加载管理器的doLoadShedding方法,如下:
@Override |
在SimpleLoadManagerImpl中,是通过遍历所有broker汇报的bundle状态,找到其中过载的bundle,然后卸载掉。
pulsar.getAdminClient().namespaces().unloadNamespaceBundle( |
4. LoadResourceQuotaUpdaterTask
这个定时任务就是调用资源管理器的writeResourceQuatosToZookeeper把资源使用情况写入zk,如下:
@Override |
六、待研究问题
自定义均衡管理器需要注意哪些内容。
如何管理topic和bundle的分配策略。
是否可实现一个topic分配到多个bundle上,多个bundle分配到不同的broker上,数据存储不变,这样实现通过多个broker读取一个topic的数据。
七、参考资料
https://pulsar.apache.org/docs/en/administration-load-balance/