pulsar消息读流程(一

大致介绍下pulsar的消息读的过程

consumer通过lookup命令查找到负责消费topic的broker地址,然后连接到该broker。broker接收到请求后,首先查找内存中有没有该订阅名称的consumer信息,如果有,则找到对应的subscription信息,然后建立和consumer的关系。之后从zk和bk中获取消息处理的ledger和对应的cursor。

然后consumer可以定时发送通知(sendFlowPermitsToBroker),broker接收到通知后就从ledger读消息返回给consumer。消息的发送是异步的,在handleFlow中处理consumer的通知,然后读取数据,读取完成后就发送给consumer。

1. consumer端的处理流程

consumer的创建代码如下:


Consumer<String> c = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("subscription_001")
.receiverQueueSize(4)
.subscribe();
Message<String> msg = c.receive(5, TimeUnit.SECONDS);

在subscribe后,会创建一个consumer对象,然后调用该对象的receive方法获取消息。

ConsumerBase

@Override
public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
if (conf.getReceiverQueueSize() == 0) {
throw new PulsarClientException.InvalidConfigurationException(
"Can't use receive with timeout, if the queue size is 0");
}
if (listener != null) {
throw new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set");
}
verifyConsumerState();
return internalReceive(timeout, unit);
}

ConsumerImpl


@Override
protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;
try {
// 先从缓存队列中获取,如果获取不到则直接返回。
message = incomingMessages.poll(timeout, unit);
if (message == null) {
return null;
}
// 暂时不返回消息,而是给broker发送通知,表示自己可以接收更多的消息了。
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
} else {
return null;
}
}
}
@Override
protected synchronized void messageProcessed(Message<?> msg) {
ClientCnx currentCnx = cnx();
ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
lastDequeuedMessageId = msg.getMessageId();
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
} else {
increaseAvailablePermits(currentCnx);
stats.updateNumMsgsReceived(msg);
trackMessage(msg);
}
decreaseIncomingMessageSize(msg);
}
void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
while (available >= receiverQueueRefillThreshold && !paused) {
// 确保能发送一次通知
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
available = AVAILABLE_PERMITS_UPDATER.get(this);
}
}
}
/**
* send the flow command to have the broker start pushing messages
* 发送flow通知给broker,异步获取更多消息。
*/
private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
if (cnx != null && numMessages > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Adding {} additional permits", topic, subscription, numMessages);
}
if (log.isDebugEnabled()) {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages))
.addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.debug("Consumer {} failed to send {} permits to broker: {}", consumerId, numMessages,
writeFuture.cause().getMessage());
} else {
log.debug("Consumer {} sent {} permits to broker", consumerId, numMessages);
}
});
} else {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise());
}
}
}

2. broker端的处理流程

和consumer交互的主要是ServerCnx中的handleConnect、handleSubscribe、handleFlow、handleAck四个方法,connect中处理认证信息,subscribe中获取订阅名称,保存订阅关系,flow中接收消息接收通知,触发从ledger中读取消息,ack中处理消息消费确认信息。

在broker中,每一个topic对应一个处理实例(PersistentTopic或者NonPersistentTopic),这个实例中保存topic信息和相关订阅信息(subscriptions)。

subscription本身是一个map变量,以SubscriptionName作为key,保存详细的订阅信息(即每一个订阅名称的所有客户端共享一组订阅信息,用于处理消息订阅的四种类型)。

在Subscription中有一个dispatcher变量,该变量直接存储所有的consumer信息。

所以,上述信息的存储关系链是:

topic -> subscription -> dispatcher -> consumer

2.1 zookeeper中存储的信息

消息订阅信息保存在zookeeper中的/managed_ledgers路径下,其中有两个重要的信息:ledger信息和cursor信息。

ledger存储topic消息在bk中的信息,cursor存储消费者读取进度,在zk中的路径分别是:

/managed_ledgers/{tenant}/{namespace}/persistent/{topic}
/managed_ledgers/{tenant}/{namespace}/persistent/{topic}/{subscription_name}

在zk中的这部分信息可以直接解析出来,测试代码如下:

public void testZKMetaData() throws Exception {
String path = "/managed-ledgers/tenant_c/ns1/persistent/topic_name";
byte[] bytes = getValue(path);
if (bytes == null) {
System.out.println("bytes is null");
return;
}
MLDataFormats.ManagedLedgerInfo info = MLDataFormats.ManagedLedgerInfo.parseFrom(bytes);
StringBuilder sb = new StringBuilder();
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : info.getLedgerInfoList()) {
sb.setLength(0);
sb.append("ledgerId=").append(ls.getLedgerId());
sb.append(", entries=").append(ls.getEntries());
sb.append(", size=").append(ls.getSize());
sb.append(", timestamp=").append(ls.getTimestamp());
System.out.println("debug vv " + sb);
}
System.out.println("done");
}
public void testZKMetaDataCursorInfo() throws Exception {
String path = "/managed-ledgers/tenant_c/ns1/persistent/topic_name/subscription_name_0";
path = "/managed-ledgers/tenant_c/ns1/persistent/topic_name/subscription_name_1";
byte[] bytes = getValue(path);
if (bytes == null) {
System.out.println("bytes is null");
return;
}
MLDataFormats.ManagedCursorInfo info = MLDataFormats.ManagedCursorInfo.parseFrom(bytes);
StringBuilder sb = new StringBuilder();
sb.setLength(0);
sb.append("markDeleteLedgerId=").append(info.getMarkDeleteLedgerId());
sb.append(", markDeleteEntryId=").append(info.getMarkDeleteEntryId());
sb.append(", lastActive=").append(info.getLastActive());
System.out.println("debug vv " + sb);
System.out.println("done");
}
byte[] getValue(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zoo = new ZooKeeper("x.x.x.x:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
System.out.println("state " + event.getState());
}
});
latch.await();
byte[] bytes = zoo.getData(path, null, null);
zoo.close();
return bytes;
}

testZKMetaData输出结果为:

debug vv ledgerId=56929, entries=10, size=710, timestamp=1628234384802
debug vv ledgerId=56933, entries=10, size=720, timestamp=1628235295486
debug vv ledgerId=56937, entries=10, size=710, timestamp=1628240968389
debug vv markDeleteLedgerId=56933, markDeleteEntryId=9, lastActive=1628236717415
debug vv markDeleteLedgerId=56937, markDeleteEntryId=2, lastActive=1628240766897

2.2 ManagedLedgerInfo和ManagedCursorInfo结构

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto2";
option java_package = "org.apache.bookkeeper.mledger.proto";
option optimize_for = SPEED;
message KeyValue {
required string key = 1;
required string value = 2;
}
message OffloadDriverMetadata {
required string name = 1;
repeated KeyValue properties = 2;
}
message OffloadContext {
optional int64 uidMsb = 1;
optional int64 uidLsb = 2;
optional bool complete = 3;
optional bool bookkeeperDeleted = 4;
optional int64 timestamp = 5;
optional OffloadDriverMetadata driverMetadata = 6;
repeated OffloadSegment offloadSegment = 7;
}
message OffloadSegment {
optional int64 uidMsb = 1;
optional int64 uidLsb = 2;
optional bool complete = 3;
optional int64 assignedTimestamp = 4; //timestamp in millisecond
optional int64 offloadedTimestamp = 5; //timestamp in millisecond
optional int64 endEntryId = 6;
optional OffloadDriverMetadata driverMetadata = 7;
}
message ManagedLedgerInfo {
message LedgerInfo {
required int64 ledgerId = 1;
optional int64 entries = 2;
optional int64 size = 3;
optional int64 timestamp = 4;
optional OffloadContext offloadContext = 5;
}
repeated LedgerInfo ledgerInfo = 1;
// If present, it signals the managed ledger has been
// terminated and this was the position of the last
// committed entry.
// No more entries can be written.
optional NestedPositionInfo terminatedPosition = 2;
repeated KeyValue properties = 3;
}
message PositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
repeated MessageRange individualDeletedMessages = 3;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 4;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
}
message NestedPositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
}
message MessageRange {
required NestedPositionInfo lowerEndpoint = 1;
required NestedPositionInfo upperEndpoint = 2;
}
message BatchedEntryDeletionIndexInfo {
required NestedPositionInfo position = 1;
repeated int64 deleteSet = 2;
}
// Generic string and long tuple
message LongProperty {
required string name = 1;
required int64 value = 2;
}
message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
required int64 cursorsLedgerId = 1;
// Last snapshot of the mark-delete position
optional int64 markDeleteLedgerId = 2;
optional int64 markDeleteEntryId = 3;
repeated MessageRange individualDeletedMessages = 4;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 5;
optional int64 lastActive = 6;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}

2.3 判断topic是属于哪一个broker

通过topic查找bundle,然后判断bundle是否属于broker。

OwnershipCache中的ownershipReadOnlyCache:


private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> resolveOwnership(String path) {
// 通过缓存的zk信息,找到该path(topic全路径)对应的的bundle信息
return ownershipReadOnlyCache.getWithStatAsync(path).thenApply(optionalOwnerDataWithStat -> {
if (optionalOwnerDataWithStat.isPresent()) {
Map.Entry<NamespaceEphemeralData, Stat> ownerDataWithStat = optionalOwnerDataWithStat.get();
Stat stat = ownerDataWithStat.getValue();
if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) {
LOG.info("Successfully reestablish ownership of {}", path);
OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitUtils.suBundleFromPath(path, bundleFactory));
if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) {
ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
}
ownershipReadOnlyCache.invalidate(path);
}
}
return optionalOwnerDataWithStat;
});
}

3. 总结

  • consumer可以随机连上一个broker,通过发送lookup命令可以最终找到一个处理所需topic的broker地址,然后连接到该broker。
  • broker接收到consumer连接后,建立好topic - subscription - dispatcher - consumer的关系,同时打开ledger和cursor,准备消息的读写。
  • consumer建立好连接后,不定时的发送flow命令,通知broker可以发送更多的消息了,broker接收到命令后,通过ledger读取消息并异步发送给consumer。
Author: iMine
Link: https://imine141.github.io/2021/08/09/pulsar/pulsar%E6%B6%88%E6%81%AF%E8%AF%BB%E6%B5%81%E7%A8%8B%EF%BC%88%E4%B8%80%EF%BC%89/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.