private void startNewSnapshot() { cleanupTimedOutSnapshots(); // 判断是否有新消息写入了 if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) { // There was no message written since the last snapshot, we can skip creating a new snapshot if (log.isDebugEnabled()) { log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName()); } return; }
MutableBoolean anyReplicatorDisconnected = new MutableBoolean(); topic.getReplicators().forEach((cluster, replicator) -> { if (!replicator.isConnected()) { anyReplicatorDisconnected.setTrue(); } });
// 当所有集群都正常时才会发送通知。 if (anyReplicatorDisconnected.isTrue()) { // Do not attempt to create snapshot when some of the clusters are not reachable if (log.isDebugEnabled()) { log.debug("[{}] Do not attempt to create snapshot when some of the clusters are not reachable.", topic.getName()); } return; }
if (log.isDebugEnabled()) { log.debug("[{}] Starting snapshot creation.", topic.getName()); }
private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) { if (!msg.getMessageBuilder().hasMarkerType()) { // No marker is defined return; }
int markerType = msg.getMessageBuilder().getMarkerType();
// 这里确保只处理消息发送方集群发送的消息,即只处理集群A发送的消息。 if (!(msg.getMessageBuilder().hasReplicatedFrom() && remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) { // Only consider markers that are coming from the same cluster that this // replicator instance is assigned to. // All the replicators will see all the markers, but we need to only process // it once. return; }
// 需要处理的消息包含三类:通知、通知应答、消费位置更新。 switch (markerType) { case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE: case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE: case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE: topic.receivedReplicatedSubscriptionMarker(position, markerType, payload); break;
default: // Do nothing } }
收到通知后,会获取LAC,然后封装LAC,写入到该Topic中。
ReplicatedSubscriptionsController
private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) { // if replicator producer is already closed, restart it to send snapshot response Replicator replicator = topic.getReplicators().get(request.getSourceCluster()); if (!replicator.isConnected()) { topic.startReplProducers(); }
// Send response containing the current last written message id. The response // marker we're publishing locally and then replicating will have a higher // message id. // 得到集群上次写入的消息 PositionImpl lastMsgId = (PositionImpl) topic.getLastPosition(); if (log.isDebugEnabled()) { log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId); }
// 该Position是集群A和B返回的消息在集群A上的Position。 synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) { if (log.isDebugEnabled()) { log.debug("[{}] Received response from {}", controller.topic().getName(), response.getCluster().getCluster()); } String cluster = response.getCluster().getCluster(); // Response中包含集群A/集群B中的LAC消息。 responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(response.getCluster().getMessageId())); missingClusters.remove(cluster);
if (log.isDebugEnabled()) { log.debug("[{}] Missing clusters {}", controller.topic().getName(), missingClusters); }
if (!missingClusters.isEmpty()) { // We're still waiting for more responses to come back return; }
// We have now received all responses // 如果超过两个集群需要两次通知,这里没看懂为啥... if (needTwoRounds && !firstRoundComplete) { // Mark that 1st round is done and start a 2nd round firstRoundComplete = true; missingClusters.addAll(remoteClusters);
if (log.isDebugEnabled()) { log.debug("[{}] Snapshot is complete {}", controller.topic().getName(), snapshotId); } // Snapshot is now complete, store it in the local topic // 这里是建立记录集群B/集群C的LAC消息和集群A收到集群B/集群C是保存LAC的消息的消息ID间的关联关系,而且是最后边一个消息。 // 所以这里不能实现同一条消息在三个集群间的关联。 PositionImpl p = (PositionImpl) position; controller.writeMarker( Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(), p.getLedgerId(), p.getEntryId(), responses)); controller.snapshotCompleted(snapshotId);
@Override public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) { Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
// 省略代码 ...
// 当可删除位置更新了(注意不能处理单独确认的消息) if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) { this.updateLastMarkDeleteAdvancedTimestamp();
// Mark delete position advance ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache; if (snapshotCache != null) { ReplicatedSubscriptionsSnapshot snapshot = snapshotCache .advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition()); if (snapshot != null) { topic.getReplicatedSubscriptionController() .ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot)); } } }
deleteTransactionMarker(properties);
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) { // Notify all consumer that the end of topic was reached if (dispatcher != null) { dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); } } }
如果能找到快照,则说明需要更新进度了。
这个进度是上一次的快照进度,不是最新消费的进度。
ReplicatedSubscriptionsController
public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName, snapshot.getClustersList().stream() .map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(), cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId())) .collect(Collectors.toList())); }
Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>(); for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) { ClusterMessageId cmid = snapshot.getClusterAt(i); clusterIds.put(cmid.getCluster(), cmid.getMessageId()); }
private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { MarkersMessageIdData updatedMessageId = null; for (int i = 0, size = update.getClustersCount(); i < size; i++) { ClusterMessageId cmid = update.getClusterAt(i); if (localCluster.equals(cmid.getCluster())) { updatedMessageId = cmid.getMessageId(); } }
if (updatedMessageId == null) { // No updates for this cluster, ignore return; }
// 这里的消息ID是上一次的LAC。 Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
if (log.isDebugEnabled()) { log.debug("[{}][{}] Received update for subscription to {}", topic, update.getSubscriptionName(), pos); }
// 更新消费者读取进度为上一次的LAC。 PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos); topic.createSubscription(update.getSubscriptionName(), InitialPosition.Latest, true /* replicateSubscriptionState */); } }