pulsar消费进度研究(一)

在pulsar中,使用ManagedCursorImpl来记录每个订阅的消息消费进度信息,每个cursor对应一个subscription。cursor的信息会记录到zookeeper中,同时pulsar也会把ack信息持久化到bookkeeper中。

通过阅读代码,了解到以下内容:

1.从zk中/managed-ledgers/tenant_c/ns1/persistent/storeV3/consumer_002中查询cursor信息,保存在对象ManagedCursorInfo中,主要包含字段:


message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
required int64 cursorsLedgerId = 1;
// Last snapshot of the mark-delete position
optional int64 markDeleteLedgerId = 2;
optional int64 markDeleteEntryId = 3;
repeated MessageRange individualDeletedMessages = 4;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 5;
optional int64 lastActive = 6;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}

2.如果cursorsLedgerId字段为-1,表示记录确认信息的ledger已经被关闭,消费进度是markDeleteLedgerId和markDeleteEntryId字段。

3.如果cursorsLedgerId存在,则从记录确认消息的ledger中读取内容。

4.从ledger中获取最开始一次添加并确认的消息ID。bk中存储的内容格式如下:

message PositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
repeated MessageRange individualDeletedMessages = 3;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 4;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
}

5.如果有单独确认的消息,会读取内容并保存到individualDeletedMessages中。

broker重启后,消费者发送订阅请求,ledger和cursor恢复过程

ServerCnx:handleSubscribe

校验topic、认证

BrokerService:getTopic

如果topic存在,则从缓存中返回topic对象

如果topic不存在,且是持久化的topic,则调用loadOrCreatePersistentTopic加载或者创建一个topic对象

BrokerService:createPersistentTopic

获取该topic的ledger配置信息,并创建一个ledger

ManagedLedgerFactoryImpl:asyncOpen

如果ledger没有打开过,则创建一个新的ledger,并调用该ledger的initialize方法

ManagedLedgerImpl:initialize

从zk中获取ledger信息,初始化bk信息,初始化cursor信息。

ManagedLedgerImpl:initializeCursors

从zk中获取topic下有哪些订阅(/managed-ledgers/tenant_c/ns1/persistent/topic_cursor的children),然后依次初始化每一个sub。

针对每一个subscription创建一个cursor对象,同时对该cursor进行恢复。

从zk中获取cursor的信息,如果cursor的ledgerId为-1,则记录MarkDeletedLedgerId和MarkDeletedEntryId为订阅进度;

如果cursor的ledgerId不是-1,则开始读取bk中的进度数据,读取最新一条消息的entry,保存该entry的信息为订阅进度。

broker打印日志如下

# 权限验证
14:47:56.559 [pulsar-io-4-1] INFO auth.server.VVPulsarAuthorizationProvider - vv_auth_v2 allowTopicOperationAsync, topicName persistent://tenant_c/ns1/topic_cursor, role vv-role
# 执行lookup阶段
14:47:56.561 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - vvv_try_to_lookup,topicName: persistent://tenant_c/ns1/topic_cursor, requestId: 2
# 权限验证
14:47:56.563 [pulsar-io-4-1] INFO auth.server.VVPulsarAuthorizationProvider - vv_auth_v2 allowTopicOperationAsync, topicName persistent://tenant_c/ns1/topic_cursor, role vv-role
# 权限验证
14:47:56.644 [pulsar-io-4-1] INFO auth.server.VVPulsarAuthorizationProvider - vv_auth_v2 allowTopicOperationAsync, topicName persistent://tenant_c/ns1/topic_cursor, role vv-role
# consumer发送subscribe消息(注意的是创建ledger在subscribe之后,)
14:47:56.644 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - vvv_subscribe, consumer_002 Subscribing on topic persistent://tenant_c/ns1/topic_cursor
14:47:56.646 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.21.32.64:59676] Subscribing on topic persistent://tenant_c/ns1/topic_cursor / consumer_002
# 这里由于broker刚重启,没有加载ledger信息,所以在consumer连接后才创建ledger对象
14:47:56.640 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - vv_cc asyncOpen, name=tenant_c/ns1/persistent/topic_cursor
# 创建ledger
14:47:56.667 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger tenant_c/ns1/persistent/topic_cursor
# 从zk中获取ledger信息
14:47:56.668 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - vvv_debug /managed-ledgers/tenant_c/ns1/persistent/topic_cursor
14:47:56.722 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[116, 101, 110, 97, 110, 116, 95, 99, 47, 110, 115, 49, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 116, 111, 112, 105, 99, 95, 99, 117, 114, 115, 111, 114], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
# ledger创建成功
14:47:56.761 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Created ledger 603
# 开始加载cursor信息
14:47:56.778 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Loading cursor consumer_002
14:47:56.792 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [tenant_c/ns1/persistent/topic_cursor] Recovering from bookkeeper ledger cursor: consumer_002
# 从zk中获取之前保存的cursor信息,主要包含cursorLedgerId、markDeletedLedgerId和markDeletedEntryId
14:47:56.792 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - vv_cc asyncGetCursorInfo, ledgerName=tenant_c/ns1/persistent/topic_cursor, cursorName=consumer_002, path=/managed-ledgers/tenant_c/ns1/persistent/topic_cursor/consumer_002
# 开始恢复cursor
14:47:56.798 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [tenant_c/ns1/persistent/topic_cursor] Cursor consumer_002 recovered to position 56:9
14:47:56.800 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Recovery for cursor consumer_002 completed. pos=56:9 -- todo=0
# cursor的恢复是在创建ledger的过程中进行的,cursor恢复后则ledger创建过程结束。
14:47:56.801 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [tenant_c/ns1/persistent/topic_cursor] Successfully initialize managed ledger
14:47:56.808 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.pulsar.broker.service.AbstractTopic - Disabling publish throttling for persistent://tenant_c/ns1/topic_cursor
14:47:56.828 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://tenant_c/ns1/topic_cursor] There are no replicated subscriptions on the topic
14:47:56.837 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://tenant_c/ns1/topic_cursor - dedup is disabled
14:47:56.843 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=tenant_c/ns1/persistent/topic_cursor, name=consumer_002, ackPos=56:9, readPos=56:10} to 603:-1 since ledger consumed completely
14:47:56.847 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Ledger 56 contains the current last confirmed entry 56:9, and it is going to be deleted
14:47:56.858 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] End TrimConsumedLedgers. ledgers=1 totalSize=0
14:47:56.858 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Removing ledger 56 - size: 673
14:47:57.251 [ForkJoinPool.commonPool-worker-5] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - vv_cc asyncOpenCursor, cursorName=consumer_002
14:47:57.262 [ForkJoinPool.commonPool-worker-5] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [tenant_c/ns1/persistent/topic_cursor-consumer_002] Rewind from 56:10 to 56:10
14:47:57.263 [ForkJoinPool.commonPool-worker-5] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://tenant_c/ns1/topic_cursor] There are no replicated subscriptions on the topic
14:47:57.263 [ForkJoinPool.commonPool-worker-5] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://tenant_c/ns1/topic_cursor][consumer_002] Created new subscription for 0
14:47:57.264 [ForkJoinPool.commonPool-worker-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.21.32.64:59676] Created subscription on topic persistent://tenant_c/ns1/topic_cursor / consumer_002
Author: iMine
Link: https://imine141.github.io/2021/08/20/pulsar/pulsar%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6%E7%A0%94%E7%A9%B6%EF%BC%88%E4%B8%80%EF%BC%89/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.