1. 初衷
pulsar中每一个topic必须由唯一的一个broker负责读和写,这样在保证消息的顺序和事务方面就很方便。但是如果出现如下场景:
- 消费业务分布在多个地区
- 某一个topic的消费业务特别多
单个broker处理请求就显得不能满足需求了。
因此,就可以设计一种只读broker,类似zookeeper中的observer,该种角色的broker只负责消息的读,不负责消息的写入。这样既能满足异地消费时从本地存储中拉取数据,又能横向扩展单topic多业务消费能力。
2. 解析
在pulsar的架构中,消息的处理和存储是分离的,producer和consumer和broker进行数据的交互,broker收到数据后选择对应的bookie节点进行读写。如下图:
这种计算和存储分离的结构,虽然一定程度上会增大消息延迟,但是对于系统的整体可用性以及资源的平均分配有很大好处。
也因此在pulsar中,broker是无状态的,broker之间不需要进行信息的同步,不需要感知其他broker的状态。当一个consumer/producer连接到broker后,broker只需要通过zk判断是否应该由自己负责topic消息的读写即可(pulsar正常工作的前提也是zk是正常的)。
pulsar中每一个topic消息的读写都是由唯一的一个broker负责的(虽然有分区topic,但本质上还是拆分成独立的topic了),这样在保证消息顺序、事务、消息消费进度等方面都会变得很方便。但是就像开头说的那样,如果一个topic的消息成千上万的订阅者都需要,这样就会因broker单点造成性能的问题。
在开始实现只读功能之前,根据以往的经验,可以想到有以下几个问题需要关注:
只读broker怎么获取到某个topic写的进度,怎么能实时感知。
只读broker怎么保存某个subscription的消费进度。
只读broker之间是否需要同步消费进度,即是否支持一个subscription从多个broker订阅。
写broker是否需要消息消费进度。
怎么删除已消费完的消息(之前是有写broker判断如果所有的subscription都消费完成了,则删除消息,现在写broker不知道各个只读broker的消费进度)。
只读broker是单独部署还是说通过消费者发送的标识判断是否是以只读方式读取。
consumer连接到一个只读broker消费一定量消息后断开连接,连接到其他的只读broker,那么其他只读broker怎么获取到上次消费进度。
针对上述问题去pulsar源码中找答案,然后找一个折中的方案实现只读broker功能,首先我们先看消息订阅涉及到的几个阶段。
2.1 lookup
该阶段是为了查找topic属于哪一个broker负责,consumer传递的信息只有topic名称。
broker收到请求后,会依次执行以下逻辑:
校验topic是否合法
是否超过了设置的同时进行lookup的数量
是否通过代理方式连接(代理方式连接需要单独校验权限)
topic操作是否被授权
校验集群信息、权限信息
计算topic属于的bundle,然后根据bundle判断属于哪一个broker,并返回broker的地址
如果实现只读broker,我们就需要在这里添加判断逻辑,返回我们给定的broker地址。
2.2 subscribe
该阶段是发送订阅请求,broker可以拿到subName。
收到订阅请求后会执行以下逻辑:
权限验证
客户端信息是否已被保存(broker是通过客户端生成的id来区分不同的consumer的),如果有则直接返回
到BrokerService中查找该subName的topic信息,如果有则复用(topic信息由单一实例存储,例如PersistentTopic);如果没有则创建,然后会依次打开topic对应的ledger、ledger下的cursor。在这个过程中会进行topic归属broker的校验。
进行schema的处理
创建Subscription实例,保存订阅信息,并建立Consumer和Subscription之间的关系
返回订阅成功给consumer
在此阶段中,由于在创建Topic实例的同时会打开ledger和cursor,而ledger是默认以写的方式打开新的ledger的,所以在只读broker中,需要在这个阶段添加判断逻辑,以只读的方式打卡ledger,并且不会创建新的写ledger。
2.3 ledger和cursor恢复
该阶段是通过zk中记录的数据进行topic以往ledger和cursor进度的恢复。分别对应zk中的path节点:
/managed-ledgers/tenant_c/ns1/persistent/topic01
/managed-ledgers/tenant_c/ns1/persistent/topic01/consumer_00
这里需要注意的就是怎么同步ledger的LAC到cursor中,并实时感知topic下新的data ledger的创建以及获取最新的LAC信息并同步给cursor。
2.4 客户端断开连接
该阶段会进行资源的清理,包括取消订阅信息、关闭consumer等。
这里需要关注的就是要把消费进度持久化到zk中(默认不是实时刷新到zk的,断开连接后也不会及时持久化,虽然会实时写入bk中)。然后就是需要等一个topic下所有的sub都断开连接了,才进行消费进度的保存。
3. 思路
通过不断尝试,最终确定实现方式如下
设置指定的broker为只读broker,只读broker不提供写功能,并且不会启动写的功能,比如创建topic。
一个subscription只能从唯一的一个broker读取消息,之间的关系会写到zk上,broker收到subscribe请求后会进行判断;连接断开后会删除zk中的关系。
broker在consumer发送subscribe后会从zk/bk中拉取最新的消费进度,consumer断开连接后会把进度回写到zk中。这样就避免了只读broker之间同步进度。
只读broker在定时器中获取data ledger的LAC(对于closed状态的ledger,通过getLastAddConfirmed获取;对于open状态的ledger,通过readLastAddConfirmed)。当获取的LAC比记录的大时,会通过notifyCursors和notifyWaitingEntryCallBacks方法触发读操作。这种方式会造成一定的延迟,但也是可接受的。。。
当一个topic的所有subscription都断开时,会关闭cursor,触发进度的持久化。
当cursor恢复时,会查询zk中是否存在记录,如果存在则通过zk中的数据恢复进度(由于默认情况下cursor的进度都是在ManagedLedger实例第一次创建的时候恢复的,中间有新的cursor创建就不会有恢复流程了)。
只读broker和读写broker共用一套zk环境,但是只是往里面写入subscription的消费进度。
通过上述方式,可以实现一个简单的只读broker。
后续需要改进的地方包括
消费进度实时感知(当前情况下不能感知到最新一条消息,可能存在刷新延迟)。
支持多topic(目前没有测试这个,估计会有问题)。
支持分区topic(同上)。
延迟消息(目前测试有问题)。
只读broker模式下ManagedLedger的一些逻辑被注释掉了,没看懂具体功能,后续还需要继续研究。