Pulsar订阅进度同步原理

描述

Pulsar自带的跨集群数据复制,可以把一个集群收到的消息复制到其他集群上,这样消费者正在连接的集群挂掉后,可以从其他集群消费消息。

除了消息的复制,pulsar还支持订阅进度的同步,具体场景如下:

部署了三个集群A、B、C,并设置集群之间的消息两两复制。消费者A刚开始从集群A消费消息,当集群A挂掉后,消费者A切换到集群B消费消息,这时候消费者A不需要从头开始读取集群B上的消息,而是可以从上一次在集群A上消费失败的位置继续从集群B上消费消息。

实现

集群间的消息复制是通过内置生产者和消费者实现的,接收消息的集群上会启动一个消息读取进程,读取发送到该集群的消息,然后通过客户端接口发送到其他集群上。

也就是说消息的复制并不是直接把bookie中的存储文件复制到其他集群上,而是使用内置的订阅名称读取本地消息,然后发送到其他集群。这样的话,两个集群的消息元数据就不是一样的,同一条消息在多个集群的消息ID是不一样的。

所以如果想实现订阅进度的同步,需要把同一条消息在多个集群的消息ID关联上,然后在各个集群间同步上次消费完毕的消息ID,这样不同集群之间就都知道消费者上次消费确认的位置并能够移动本集群的消费位点了。

所以实现订阅进度同步的关键点在于

  • 集群间能相互通信

  • 同一条消息在多集群间的关联关系

  • 消费进度更新时能及时通知到其他集群

具体方法

Pulsar本身的消息同步并不是实时的、严格的同步,只是避免大范围的重复消费,还是会有一小段消息的重复。

Pulsar订阅进度同步的流程

  • 发布快照通知给其他集群。

  • 其他集群收到通知后,记录本集群的LAC指针并发送给开启同步流程的集群。

  • 开启流程的集群收到其他所有集群的返回消息后,分别建立其他集群LAC和本集群上一次消息ID的关联关系。

  • 开启流程的集群建立好消息的ID的关联关系后,生成一个快照消息,并插入到消息队列中。

  • 消费者返回确认后,会根据消息ID检测上一个快照,找到后就说明消费位置已经超过快照了,需要通知其他消费者更新消费位置了。

  • 其他集群收到位置更新通知后更新本集群的消费位置。

下面以某个场景详细说明下。

场景描述:

集群A、B、C三个集群互相同步消息,生产者都发送消息到集群A上。

集群A发布一个快照通知

创建Topic时会检测是否需要同步订阅进度,如果需要则通过ReplicatedSubscriptionsController启动一个定时任务,任务的功能就是发送一个通知,代码如下:

ReplicatedSubscriptionsController

private void startNewSnapshot() {
cleanupTimedOutSnapshots();
// 判断是否有新消息写入了
if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
}
return;
}

MutableBoolean anyReplicatorDisconnected = new MutableBoolean();
topic.getReplicators().forEach((cluster, replicator) -> {
if (!replicator.isConnected()) {
anyReplicatorDisconnected.setTrue();
}
});

// 当所有集群都正常时才会发送通知。
if (anyReplicatorDisconnected.isTrue()) {
// Do not attempt to create snapshot when some of the clusters are not reachable
if (log.isDebugEnabled()) {
log.debug("[{}] Do not attempt to create snapshot when some of the clusters are not reachable.",
topic.getName());
}
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Starting snapshot creation.", topic.getName());
}

pendingSnapshotsMetric.inc();
ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
pendingSnapshots.put(builder.getSnapshotId(), builder);
// 这里只是通过往该Topic写入一条消息,消息中包含通知ID。
builder.start();
}

ReplicatedSubscriptionsSnapshotBuilder

void start() {
if (log.isDebugEnabled()) {
log.debug("[{}] Starting new snapshot {} - Clusters: {}", controller.topic().getName(), snapshotId,
missingClusters);
}
startTimeMillis = clock.millis();
// 设置通知ID(快照ID)和本集群名称。
controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
}

集群B和C收到通知

集群A写入消息到该Topic后,该条消息会和其他消息一样发送到集群B和集群C上。

由于集群间是相互复制消息,所以集群B和C的复制模块能够读取到该消息,并作相应的处理。

PersistentReplicator

private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
return;
}

int markerType = msg.getMessageBuilder().getMarkerType();

// 这里确保只处理消息发送方集群发送的消息,即只处理集群A发送的消息。
if (!(msg.getMessageBuilder().hasReplicatedFrom()
&& remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) {
// Only consider markers that are coming from the same cluster that this
// replicator instance is assigned to.
// All the replicators will see all the markers, but we need to only process
// it once.
return;
}

// 需要处理的消息包含三类:通知、通知应答、消费位置更新。
switch (markerType) {
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
break;

default:
// Do nothing
}
}

收到通知后,会获取LAC,然后封装LAC,写入到该Topic中。

ReplicatedSubscriptionsController

private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
// if replicator producer is already closed, restart it to send snapshot response
Replicator replicator = topic.getReplicators().get(request.getSourceCluster());
if (!replicator.isConnected()) {
topic.startReplProducers();
}

// Send response containing the current last written message id. The response
// marker we're publishing locally and then replicating will have a higher
// message id.
// 得到集群上次写入的消息
PositionImpl lastMsgId = (PositionImpl) topic.getLastPosition();
if (log.isDebugEnabled()) {
log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
}

// 把消息ID连同通知ID一起写入到Topic中。
ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
request.getSnapshotId(),
request.getSourceCluster(),
localCluster,
lastMsgId.getLedgerId(), lastMsgId.getEntryId());
writeMarker(marker);
}

集群A建立本集群和集群B、C间消费ID的关联关系

集群B和C写入的消息会被集群B和C的复制模块发送到集群A,集群A的复制模块会读到该消息并做相应的处理。

ReplicatedSubscriptionsController

private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
String snapshotId = response.getSnapshotId();
ReplicatedSubscriptionsSnapshotBuilder builder = pendingSnapshots.get(snapshotId);
if (builder == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
response.getCluster().getCluster());
}
return;
}

builder.receivedSnapshotResponse(position, response);
}

ReplicatedSubscriptionsSnapshotBuilder

// 该Position是集群A和B返回的消息在集群A上的Position。
synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received response from {}", controller.topic().getName(),
response.getCluster().getCluster());
}
String cluster = response.getCluster().getCluster();
// Response中包含集群A/集群B中的LAC消息。
responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(response.getCluster().getMessageId()));
missingClusters.remove(cluster);

if (log.isDebugEnabled()) {
log.debug("[{}] Missing clusters {}", controller.topic().getName(), missingClusters);
}

if (!missingClusters.isEmpty()) {
// We're still waiting for more responses to come back
return;
}

// We have now received all responses
// 如果超过两个集群需要两次通知,这里没看懂为啥...
if (needTwoRounds && !firstRoundComplete) {
// Mark that 1st round is done and start a 2nd round
firstRoundComplete = true;
missingClusters.addAll(remoteClusters);

controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Snapshot is complete {}", controller.topic().getName(), snapshotId);
}
// Snapshot is now complete, store it in the local topic
// 这里是建立记录集群B/集群C的LAC消息和集群A收到集群B/集群C是保存LAC的消息的消息ID间的关联关系,而且是最后边一个消息。
// 所以这里不能实现同一条消息在三个集群间的关联。
PositionImpl p = (PositionImpl) position;
controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
p.getLedgerId(), p.getEntryId(), responses));
controller.snapshotCompleted(snapshotId);

double latencyMillis = clock.millis() - startTimeMillis;
snapshotMetric.observe(latencyMillis);
}

集群A建立好关联关系后,会生成一个快照,这个快照包含各集群消息ID的关联关系、

然后写入到该Topic中,之后这条消息就会复制到集群B和集群C上。

创建快照缓存

虽然上边的步骤已经生成快照了,但是快照的缓存并不上在上边添加的。当快照被写到Topic中,那么当消费者读取消息时就肯定能读到该快照。

当消息在Broker端被读到时,通过过滤器处理这些消息。

AbstractBaseDispatcher

public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
totalEntries++;
ByteBuf metadataAndPayload = entry.getDataBuffer();
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
? entryWrapper.get()[entryWrapperIndex].getMetadata()
: null;
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
: msgMetadata;
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
// 省略代码 ...

} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
// 这里处理读到的快照消息。
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}

entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
continue;
}

// 省略代码 ...
}
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);

try {
ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
subscription.processReplicatedSubscriptionSnapshot(snapshot);
} catch (Throwable t) {
log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
return;
}
}

PersistentSubscription


@Override
public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
}
}

ReplicatedSubscriptionSnapshotCache

public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// 这里的本地消息ID,就是保存集群B/集群C LAC消息的消息ID,且是最新的一个。
MarkersMessageIdData msgId = snapshot.getLocalMessageId();
PositionImpl position = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());

if (log.isDebugEnabled()) {
log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position,
snapshot.getSnapshotId());
}

snapshots.put(position, snapshot);

// Prune the cache
while (snapshots.size() > maxSnapshotToCache) {
snapshots.pollFirstEntry();
}
}

至此快照从存储中被加载到Broker内存中。

集群A更新进度

当消费者订阅消息时,快照才会被加载到Broker内存里,然后消费者返回确认消息时,会查找消息ID的上一个快照。

PersistentSubscription

@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();

// 省略代码 ...

// 当可删除位置更新了(注意不能处理单独确认的消息)
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
this.updateLastMarkDeleteAdvancedTimestamp();

// Mark delete position advance
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
.advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
if (snapshot != null) {
topic.getReplicatedSubscriptionController()
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
}
}
}

deleteTransactionMarker(properties);

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
if (dispatcher != null) {
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}
}
}

如果能找到快照,则说明需要更新进度了。

这个进度是上一次的快照进度,不是最新消费的进度。

ReplicatedSubscriptionsController

public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
snapshot.getClustersList().stream()
.map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
.collect(Collectors.toList()));
}

Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
ClusterMessageId cmid = snapshot.getClusterAt(i);
clusterIds.put(cmid.getCluster(), cmid.getMessageId());
}

ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
writeMarker(subscriptionUpdate);
}

集群B和C更新进度

更新进度的消息会被写入到Topic中,这个消息也会被复制到集群B和C。

集群B和C中的复制模块读到消息后会做如下处理:

ReplicatedSubscriptionsController

private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
MarkersMessageIdData updatedMessageId = null;
for (int i = 0, size = update.getClustersCount(); i < size; i++) {
ClusterMessageId cmid = update.getClusterAt(i);
if (localCluster.equals(cmid.getCluster())) {
updatedMessageId = cmid.getMessageId();
}
}

if (updatedMessageId == null) {
// No updates for this cluster, ignore
return;
}

// 这里的消息ID是上一次的LAC。
Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received update for subscription to {}", topic, update.getSubscriptionName(), pos);
}

// 更新消费者读取进度为上一次的LAC。
PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
if (sub != null) {
sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
} else {
// Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
topic.createSubscription(update.getSubscriptionName(),
InitialPosition.Latest, true /* replicateSubscriptionState */);
}
}

可以看到,消息进度的同步是由延迟的,而且是不是精确的。

其他说明

建立不同集群间消息ID的关联关系过程,是一个不精确的过程。集群A只是发送了一个快照通知,集群B和集群C把当前LAC发送给集群A,集群A读到这个LAC时,已经有新的消息写入到集群A了。此时集群A把集群B和集群C上次的LAC和本地集群中相对靠前的消息ID建立了关联关系。

比如:

集群A收到1、2、3三条消息,发送通知给集群B和集群C。

集群B读取到3后,可能有4、5、6又发送到集群B了,此时集群B发送的LAC是6。

集群C读取到3后,可能有4、5、6、7又发送到集群C了,此时集群C发送的LAC是7。

集群A读取到6后,记录下来,等待集群C的LAC。

集群A读取到7后,可能有8、9发送到集群A了,此时存储7的消息ID变成了10。

集群A建立的关联关系是10 - 6(集群B)- 7(集群C)。

当集群A的消费者读取到10的时候,把快照缓存到Broker的内存里,读取到11的时候,发现前边有一个快照。然后记录更新通知,写到Topic里。

集群B和集群C读取到快照时,会把进度分别更新到6和7,但此时消费者其实已经读取到11了。

按照这种解释,订阅进度永远不可能完全一致,即使集群A长时间都没收到消息了,集群B和集群C的消费进度也不会和集群A一致。

Author: iMine
Link: https://imine141.github.io/2021/10/15/pulsar/Pulsar%E8%AE%A2%E9%98%85%E8%BF%9B%E5%BA%A6%E5%90%8C%E6%AD%A5%E5%8E%9F%E7%90%86/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.