pulsar消费进度保存(二)

本文只针对persistent topic进行说明。

1. 消息消费流程

consumer和broker之间是通过TCP进行数据交互的,通信框架基于netty,数据格式是TLV结构,分为不包含负载的简单消息和包含负载的消息。

具体参考:https://pulsar.apache.org/docs/en/develop-binary-protocol/

1.1 简单消息(Simple commands)

名称 长度 含义
totalSize 4字节 消息总长度,不包含该字段。(单个消息的长度最长为5M)
commandSize 4字节 序列化消息的长度
message 序列化消息

1.2 内容消息(Payload Message)

这种类型的消息主要用于发布和传输消息。

其中message字段并不是用户自定义的消息内容,而是pulsar进行通信时内定的各种消息,已有的消息类型参考:

https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto

pulsar中的消息以前是基于google protobuf进行序列化的,之后为了节约空间和减少代码量(protobuf会生成很多代码…)使用lightproto(https://github.com/splunk/lightproto)进行序列化和反序列化。

名称 长度 含义
totalSize 4字节 消息总长度,不包含该字段。(单个消息的长度最长为5M)
commandSize 4字节 序列化消息的长度
message 序列化消息
magicNumber 2字节 用于校验消息,固定为0x0e01。有此标识的话就会进行CRC校验。
checksum 4字节 用于校验消息完整性。对该字段后边的消息进行CRC32-C计算。具体参考:1.2.1 CRC校验
metadataSize 4字节 元数据大小
metaData 元数据内容
payload 其他内容(用户消息)

1.2.1 CRC校验

从netty接收到类型为Message的消息后,会调用handleMessage方法,然后调用consumer的messageReceived方法,在此处进行CRC校验。

调用过程如下(只是列举了方法内的主要逻辑,代码并不完整):

// PulsarDecoder -> channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
this.proxyMessage = proxyMessage;
proxyMessage.release();
return;
}
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
try {
// De-serialize the command
int cmdSize = (int) buffer.readUnsignedInt();
cmd.parseFrom(buffer, cmdSize);
log.info("vvv_msg_type " + cmd.getType());
switch (cmd.getType()) {
case MESSAGE: {
checkArgument(cmd.hasMessage());
handleMessage(cmd.getMessage(), buffer);
break;
}
}
}
}
// ClientCnx -> handleMessage
@Override
protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayload) {
checkArgument(state == State.Ready);
if (log.isDebugEnabled()) {
log.debug("{} Received a message from the server: {}", ctx.channel(), cmdMessage);
}
ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
if (consumer != null) {
List<Long> ackSets = Collections.emptyList();
if (cmdMessage.getAckSetsCount() > 0) {
ackSets = new ArrayList<>(cmdMessage.getAckSetsCount());
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSets.add(cmdMessage.getAckSetAt(i));
}
}
consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), ackSets, headersAndPayload, this);
}
}
// ConsumerImpl -> messageReceived
void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
}
log.info("headersAndPayload=" + headersAndPayload.readableBytes());
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
MessageMetadata msgMetadata;
try {
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
}
// ConsumerImpl -> verifyChecksum
private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) {
if (hasChecksum(headersAndPayload)) {
int checksum = Commands.readChecksum(headersAndPayload);
int computedChecksum = Crc32cIntChecksum.computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
"[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
Long.toHexString(checksum), Integer.toHexString(computedChecksum));
return false;
}
}
return true;
}
// Commands -> hasChecksum
public static boolean hasChecksum(ByteBuf buffer) {
// magicCrc32c == 0x0e01
return buffer.getShort(buffer.readerIndex()) == magicCrc32c;
}

1.3 消息订阅和确认过程

consumer和broker的交互是基于netty进行的,业务处理handler分别是ClientCnx和ServerCnx。

通过查看代码可以了解到consumer连接到broker,消费消息并返回确认的过程中会依次发送以下几种类型的消息:

图片

CONNECT:建立TCP连接,broker端确认client的权限,确认成功后返回CONNECTED。

PATITIONED_METADATA:获取分区数据。由于分区实际上是通过虚拟topic实现的,所有在此阶段可以获取分区topic信息。

LOOKUP:通过topic名称查找消息是哪一个broker处理的,返回broker的地址。

SUBSCRIBE:consumer向返回的broker地址建立连接,发送topic、subscriptionName等信息,开始订阅消息。

FLOW:consumer端发送消息获取请求,并告知broker自己可以接收的最大消息数量。

MESSAGE:broker收到flow请求后,会从缓存或者bk中拉取最新的消息,在消息完成封装后发送给consumer。

ACK:consumer接收并处理完消息后,发送消息消费确认通知(已消费消息的messageId)。

CLOSE_CONSUMER:发送连接断开请求。

2. 订阅进度持久化

  • 每一个topic的消息都是由唯一的一个broker负责写入的,所以能比较容易保证每条消息的ID是唯一的,这个ID也是pulsar消息的消费和确认标识。

一个ID包含如下内容:

ledgerId:bk中每一个文件对应一个ledgerId,由bk维护。

entryId:在每一个ledger中递增,由bk维护。

partitionId:分区ID,由broker维护。

  • 在pulsar中,ManagedLedgerImpl用于用户消息的写入和读取,ManagedCursorImpl用于topic下每一个subscription的消费进度维护。

  • consumer收到broker发送的消息后,发送确认信息(messageId)给broker,broker收到后查找该subscription对应的ledger,如果存在则通过ledger写入到bk中;

    如果不存在,则创建一个ledger并把ledger信息写入到zk中,然后通过创建后的ledger写入bk中。

  • zk中写入的数据包含cursorLedgerId、deletedLedgerId、deletedEntryId等信息,如下:

cursorsLedgerId=-1, markDeleteLedgerId=603, markDeleteEntryId=15, lastActive=1629692232349
upper ledgerId=603, upper entryId=18, lower ledgerId=603, lower entryId=16
upper ledgerId=37740, upper entryId=2, lower ledgerId=37740, lower entryId=1
upper ledgerId=37740, upper entryId=4, lower ledgerId=37740, lower entryId=3

uppper和lower用于处理不连续确认的情况。

而且,只有最新一个ledger的最新一个entry信息是有效的,broker初始化cursor信息时只会加载最新ledger的最近一条确认消息的entry中最新一条数据。

  • bk中存储的数据内容:
lowLedgerId=603, lowEntryId=16, upperLedgerId=603, upperEntryId=18
lowLedgerId=37740, lowEntryId=1, upperLedgerId=37740, upperEntryId=2
lowLedgerId=37740, lowEntryId=3, upperLedgerId=37740, upperEntryId=4
lowLedgerId=37740, lowEntryId=5, upperLedgerId=37740, upperEntryId=6

可以看到bk中数据主要记录确认情况,zk中记录确认情况在哪一个ledger以及哪些数据被标记为可删除了(被成功消费了)。

  • 对于不连续确认的消息,会记录消息确认范围信息。从low - upper之前的所有消息都被消费成功了(左开右闭)。

  • 因broker停掉或者其他原因导致cursor被关闭后,会把进度持久化到zk或者bk中。满足以下条件会保存到bk中:


private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& config.getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > config.getMaxUnackedRangesToPersistInZk();
}

2.1 zk中消费进度解析

当客户端连接后,要获取该subscription的cursor信息,先从zk中查询,path如下:

/managed-ledgers/{tenant}/{namespace}/persistent/{topic}/{subscriptionName}

public void testZKMetaDataCursorInfo() throws Exception {
String path = "/managed-ledgers/tenant_c/ns1/persistent/topic_cursor/consumer_002";
byte[] bytes = getValue(path);
if (bytes == null) {
System.out.println("bytes is null");
return;
}
MLDataFormats.ManagedCursorInfo info = MLDataFormats.ManagedCursorInfo.parseFrom(bytes);
StringBuilder sb = new StringBuilder();
sb.setLength(0);
sb.append("cursorsLedgerId=").append(info.getCursorsLedgerId());
sb.append(", markDeleteLedgerId=").append(info.getMarkDeleteLedgerId());
sb.append(", markDeleteEntryId=").append(info.getMarkDeleteEntryId());
sb.append(", lastActive=").append(info.getLastActive());
if (info.getIndividualDeletedMessagesCount() > 0) {
for (MLDataFormats.MessageRange range : info.getIndividualDeletedMessagesList()) {
sb.append("\n");
sb.append(" upper ledgerId=").append(range.getUpperEndpoint().getLedgerId());
sb.append(", upper entryId=").append(range.getUpperEndpoint().getEntryId());
sb.append(", lower ledgerId=").append(range.getLowerEndpoint().getLedgerId());
sb.append(", lower entryId=").append(range.getLowerEndpoint().getEntryId());
}
}
System.out.println("debug vv " + sb);
System.out.println("done");
}

2.2 bk中消费进度解析

如果从zk中查询的数据中,cursorLedger不等于-1,则会从bk中查询该ledger的数据,恢复进度。

private void parsePulsarCursor(StringBuilder sb, LedgerEntry entry) throws Exception {
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntryBytes());
// sb.append(", ledgerId=").append(positionInfo.getLedgerId());
// sb.append(", entryId=").append(positionInfo.getEntryId());
PositionImpl position = new PositionImpl(positionInfo);
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
positionInfo.getIndividualDeletedMessagesList().forEach(messageRange -> {
sb.setLength(0);
MLDataFormats.NestedPositionInfo point = messageRange.getLowerEndpoint();
sb.append(", lowLedgerId=").append(point.getLedgerId());
sb.append(", lowEntryId=").append(point.getEntryId());
point = messageRange.getUpperEndpoint();
sb.append(", upperLedgerId=").append(point.getLedgerId());
sb.append(", upperEntryId=").append(point.getEntryId());
log.info(sb.toString());
});
} else {
log.info("-----------------");
}
if (positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
positionInfo.getBatchedEntryDeletionIndexInfoList().forEach(batchDeletedIndexInfo -> {
sb.setLength(0);
if (batchDeletedIndexInfo.getDeleteSetCount() > 0) {
for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) {
long indexId = batchDeletedIndexInfo.getDeleteSetList().get(i);
sb.append(", indexId=").append(indexId);
}
}
log.info(sb.toString());
});
}
log.info("=================");
}

3. 订阅进度恢复流程

订阅进度保存在zk和bk中,这些信息会在broker加载topic信息的时候被恢复。

当producer和consumer连接到broker时,都会调用BrokerService的getTopic方法(分别在ServerCnx的handleProducer和handleSubscribe方法)。

由于brokerService是单例的,所以consumer和producer共用一组topic信息,然后通过topic实例关联ledger、cursor等信息。

在BrokerService中,topic实例创建过程:

  • 是否已经创建,如果是则直接返回。
  • 该broker是否有topic的拥有权,如果没有则返回异常。
  • 对创建过程进行加锁,获取到锁后开始创建topic实例(同一时刻,一个topic仅有一个创建任务),由如下参数控制并发创建数量:
return topics.computeIfAbsent(topic, (topicName) -> {
return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
});
# Max number of concurrent topic loading request broker allows to control number of zk-operations
# 限制获取topic信息时,zk并发操作数量
maxConcurrentTopicLoadRequest=5000

  • 在后续每一个流程中都会校验该topic是否由该broker负责。
  • 判断ns中topic数量是否超过最大值,如果是则返回。
  • 打开ledger。
// BrokerService -> createPersistentTopic
// Once we have the configuration, we can proceed with the async open operation
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new OpenLedgerCallback())...
  • 初始化ledger、bookeeper、cursor。
  • 从zk中获取ledger信息。
// ManagedLedgerImpl -> synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx)
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback<ManagedLedgerInfo>() {

从zk中获取cursor信息。

// ManagedLedgerImpl -> private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback)store.getCursors(name, new MetaStoreCallback<List<String>>() {...

恢复cursor内容。

void recover(final VoidCallback callback) {
// Read the meta-data ledgerId from the store
log.info("[{}] Recovering from bookkeeper ledger cursor: {}", ledger.getName(), name);
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
// 如果zk中的cursorLedger是-1,则说明不需要从ledger中查询订阅进度,直接从zk中加载进度信息即可。
if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
PositionImpl recoveredPosition = new PositionImpl(info.getMarkDeleteLedgerId(),
info.getMarkDeleteEntryId());
if (info.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesList());
}
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (info.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = Maps.newHashMap();
for (int i = 0; i < info.getPropertiesCount(); i++) {
LongProperty property = info.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
recoveredCursor(recoveredPosition, recoveredProperties, null);
callback.operationComplete();
} else {
// 需要从bk中加载消费进度信息。
// Need to proceed and read the last entry in the specified ledger to find out the last position
log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
info.getCursorsLedgerId());
recoverFromLedger(info, callback);
}
}
@Override
public void operationFailed(MetaStoreException e) {
callback.operationFailed(e);
}
});
}

  • 从ledger中加载进度信息。
protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallback callback) {
// Read the acknowledged position from the metadata ledger, then create
// a new ledger and write the position into it
ledger.mbean.startCursorLedgerOpenOp();
long ledgerId = info.getCursorsLedgerId();
OpenCallback openCallback = (rc, lh, ctx) -> {
...
// Read the last entry in the ledger
// 读取最新一个位置
long lastEntryInLedger = lh.getLastAddConfirmed();

...

lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
...

// 读取最后写入ledger的entry
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(new ManagedLedgerException(e));
return;
}
// 加载属性信息
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (positionInfo.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = Maps.newHashMap();
for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
LongProperty property = positionInfo.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
PositionImpl position = new PositionImpl(positionInfo);
// 如果有单独确认的消息(为了应对不是连续确认的情况)。
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, lh);
callback.operationComplete();
}, null);
};
// 打开一个新的ledger,并把进度信息写入新ledger中。
try {
bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
} catch (Throwable t) {
log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
ledger.getName(), ledgerId, name, t);
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null);
}
}
  • 从ledger中加载单独确认的消息。

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
lock.writeLock().lock();
try {
individualDeletedMessages.clear();
individualDeletedMessagesList.forEach(messageRange -> {
MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
// 已确认的消息都在一个ledger内。
if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
} else {
// 已确认的消息在不同的ledger内,这时候需要加载每个ledger中已确认的消息。
// Store message ranges after splitting them by ledger ID
LedgerInfo lowerEndpointLedgerInfo = ledger.getLedgersInfo().get(lowerEndpoint.getLedgerId());
if (lowerEndpointLedgerInfo != null) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
lowerEndpoint.getLedgerId(), lowerEndpointLedgerInfo.getEntries() - 1);
} else {
log.warn("[{}][{}] No ledger info of lower endpoint {}:{}", ledger.getName(), name,
lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId());
}
// 加载顺序是:
// (lowerLedgerId, lowerEntryId) -> (lowerLedgerId, -1) 第一条
// (lowerLedgerId, -1) -> (lowerLedgerId, -1) 中间部分
// (upperLedgerId, -1) -> (upperLedgerId, upperEntryId) 最后一条
for (LedgerInfo li : ledger.getLedgersInfo()
.subMap(lowerEndpoint.getLedgerId(), false, upperEndpoint.getLedgerId(), false).values()) {
individualDeletedMessages.addOpenClosed(li.getLedgerId(), -1, li.getLedgerId(),
li.getEntries() - 1);
}
individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1,
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
}
});
} finally {
lock.writeLock().unlock();
}
}
  • 进度恢复完毕。
Author: iMine
Link: https://imine141.github.io/2021/08/23/pulsar/pulsar%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6%E4%BF%9D%E5%AD%98%EF%BC%88%E4%BA%8C%EF%BC%89/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.