# Max number of concurrent topic loading request broker allows to control number of zk-operations # 限制获取topic信息时,zk并发操作数量 maxConcurrentTopicLoadRequest=5000
在后续每一个流程中都会校验该topic是否由该broker负责。
判断ns中topic数量是否超过最大值,如果是则返回。
打开ledger。
// BrokerService -> createPersistentTopic // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new OpenLedgerCallback())...
初始化ledger、bookeeper、cursor。
从zk中获取ledger信息。
// ManagedLedgerImpl -> synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback<ManagedLedgerInfo>() {
voidrecover(final VoidCallback callback) { // Read the meta-data ledgerId from the store log.info("[{}] Recovering from bookkeeper ledger cursor: {}", ledger.getName(), name); ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, newMetaStoreCallback<ManagedCursorInfo>() { @Override publicvoidoperationComplete(ManagedCursorInfo info, Stat stat) { cursorLedgerStat = stat; lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive; // 如果zk中的cursorLedger是-1,则说明不需要从ledger中查询订阅进度,直接从zk中加载进度信息即可。 if (info.getCursorsLedgerId() == -1L) { // There is no cursor ledger to read the last position from. It means the cursor has been properly // closed and the last mark-delete position is stored in the ManagedCursorInfo itself. PositionImplrecoveredPosition=newPositionImpl(info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId()); if (info.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesList()); } Map<String, Long> recoveredProperties = Collections.emptyMap(); if (info.getPropertiesCount() > 0) { // Recover properties map recoveredProperties = Maps.newHashMap(); for (inti=0; i < info.getPropertiesCount(); i++) { LongPropertyproperty= info.getProperties(i); recoveredProperties.put(property.getName(), property.getValue()); } } recoveredCursor(recoveredPosition, recoveredProperties, null); callback.operationComplete(); } else { // 需要从bk中加载消费进度信息。 // Need to proceed and read the last entry in the specified ledger to find out the last position log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name, info.getCursorsLedgerId()); recoverFromLedger(info, callback); } } @Override publicvoidoperationFailed(MetaStoreException e) { callback.operationFailed(e); } }); }
从ledger中加载进度信息。
protectedvoidrecoverFromLedger(final ManagedCursorInfo info, final VoidCallback callback) { // Read the acknowledged position from the metadata ledger, then create // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); longledgerId= info.getCursorsLedgerId(); OpenCallbackopenCallback= (rc, lh, ctx) -> { ... // Read the last entry in the ledger // 读取最新一个位置 longlastEntryInLedger= lh.getLastAddConfirmed(); ... lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> { ... // 读取最后写入ledger的entry LedgerEntryentry= seq.nextElement(); PositionInfo positionInfo; try { positionInfo = PositionInfo.parseFrom(entry.getEntry()); } catch (InvalidProtocolBufferException e) { callback.operationFailed(newManagedLedgerException(e)); return; } // 加载属性信息 Map<String, Long> recoveredProperties = Collections.emptyMap(); if (positionInfo.getPropertiesCount() > 0) { // Recover properties map recoveredProperties = Maps.newHashMap(); for (inti=0; i < positionInfo.getPropertiesCount(); i++) { LongPropertyproperty= positionInfo.getProperties(i); recoveredProperties.put(property.getName(), property.getValue()); } } PositionImplposition=newPositionImpl(positionInfo); // 如果有单独确认的消息(为了应对不是连续确认的情况)。 if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } recoveredCursor(position, recoveredProperties, lh); callback.operationComplete(); }, null); }; // 打开一个新的ledger,并把进度信息写入新ledger中。 try { bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null); } catch (Throwable t) { log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", ledger.getName(), ledgerId, name, t); openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null); } }