pulsar 与其他中间件的对比

一、背景

随着业务的快速变化,应用程序通信和集成变得更加重要。一个坚固的、成熟的、紧密耦合的通信基础设施成为了数字企业真正的基础,它让企业可以快速地应对变化。近年来,最有前景的数字通信方法来自开源社区,开发人员在这里合作,为构建数字世界的共同挑战提供解决方案。这些解决方案有 Kafka,RabbitMQ等。因为满足不断变化的业务需求需要仔细考虑,所以在本白皮书中,我们定义了几个开源或商业选项,并列出了它们的优点、缺点以及相关复杂性和成本信息。

二、messaging 简介

当计算机开始通讯时,messaging 就成为了系统之间通讯的基础。最开始的消息传递是在阿帕网,它是第一个广域分组交换网络,使用以太网、TCP/IP等网络层协议,来进行系统间的通讯。随着阿帕网的发展,更多的系统相互连接,成为今天的互联网,通信原则开始从网络层进入到应用层。随着这些进步,抽象层出现了,简化了系统连接和通信的方式,这也成为消息传递技术的目标。

多年来,为不同类型的通信发明了新的通信协议。Java消息服务(JMS)和数据分发服务(DDS)规范协议在90年代末和21世纪初出现。许多应用程序开始使用像HTML和HTTP这样的协议,而不是最初设计的协议。每个人都在寻找一个协议,它可以适用于任何东西,但现实是,永远不会有一个单一的通信方法。数字通信将始终是多种方法和模式的混合体。

三、各个MQ简介

1. RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求在其次。RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。在性能不如 Kafka,但也可以做一些性能上的优化。

特性
1. 所需的技能

了解AMQP协议和规范。

2. 复杂性

API 和协议级别的通用数据交换方法意味着数据分发的选项呈指数级增长。

3. 优点
  • 基于AMQP协议,操作行为的定义非常明确
  • 简易灵活的部署方式
  • 在低吞吐量的情况下,有着非常低延迟
  • 文档丰富和社区活跃
4. 缺点
  • 吞吐量限制,单机峰值约万/s
  • 维护复杂,在弹性伸缩、高可用性、数据复制等不如 pulsar、kafka
  • 缺乏本地流处理、消息回溯、消息压缩等
5.总成本

RabbitMQ 易于部署和维护。但是由于是Erlang开发的,功能扩展和二次开发代价高。

6. 性能
  • 并发一般,万级别 TPS
  • 延迟低,微秒级别
  • 可扩展性一般
  • 支持分布式,RabbitMQ 是为大规模部署而设计的,但通常需要大规模的服务器基础架构来支持可扩展的环境,以及复杂的路由来支持全球分布式结构

2.RocketMQ

RocketMQ 是由阿里巴巴消息中间件团队研发的一款高性能、高吞吐量、低延迟、高可用、高可靠的分布式消息中间件,参考了优秀的开源消息中间件Kafka,天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商领域中,以及业务削峰。开源后并于2016年捐赠给Apache社区孵化,目前已经成为了Apache顶级项目。

特性
1. 所需的技能

需要对消息传递,消息事物,消息重试机制等原理有基本理解。

2. 复杂性

相对于 kafka,不需要部署额外的 Zookeeper,内置了 nameserver ,部署简单。

3. 优点
  • 满足多种需求,支持有序消息、延迟消息、消息回溯、消息积压等等
  • 保证高可用,能够大规模集群化部署
  • java 开发,阅读源代码、扩展、二次开发方便
4. 缺点
  • 消息重复问题,它不能保证不重复
  • 延迟消息,只支持固定 level
  • 社区活跃度和文档都相较一般
5. 总成本

RocketMQ 易于部署和维护,但与任何开源解决方案一样,随着基础设施的扩展,支持和维护它的成本也会增加。

6. 性能
  • 并发高,十万级别 TPS
  • 延迟低,毫秒级别
  • 可扩展性强,集群可以同时进行水平方向和垂直方向的缩放
  • 支持分布式,原生支持高可用集群,分布式扩展设计

3.Kafka

Kafka 是一个分布式、分区的、多副本的,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等。由 Linkedin 公司开发,于2010年贡献给了 Apache 基金会并成为顶级开源项目。

特性
1. 所需的技能

了解消息传递和底层操作系统功能,如存储和网络通信。需要学习其他组件,如Zookeeper等。

2. 复杂性

相对容易开箱即用。当需要诸如安全性、复制和全局分发等功能时,复杂性就会增加。

3. 优点
  • 性能强大,拥有百万级的吞吐量
  • 支持数据持久、分布式流处理、跨域复制等功能
  • 与周边系统的兼容性好,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafka
  • 文档丰富和社区活跃
4. 缺点
  • 没有与租户完全隔离的本地多租户
  • 长期存储数据昂贵,尽管可以长时间存储,但是由于成本问题却很少用到它
  • 运维困难,必须提前计划和计算 broker、topic、分区和副本的数量(确保计划的未来使用量增长),以避免扩展问题
  • 在大量 topic 下,吞吐量会大幅下降
5. 总成本

Kafka 很简单,相对容易启动和运行,特别是对于中小型项目。开源并不意味着免费,Kafka 不断达到企业规模需要专门的支持人员来维护基础设施。

6. 性能
  • 并发高,百万级别 TPS
  • 延迟低,毫秒级别
  • 可扩展性好,集群可以同时进行水平方向和垂直方向的缩放
  • 支持分布式

4.ADMQ

ADMQ 是基于Apache Pulsar 的二次开发消息中间件,是下一代云原生分布式消息流平台。在许多方面,Pulsar 与 Kafka 相似,但在规划部署方面差别很大。Pulsar 最初只是传统的消息传递系统,后面加入了流处理的功能。这对于那些希望部署大规模系统同时要求更少的复杂性的用户来说是非常有吸引力的。Pulsar 企业级的分布式对于多租户和数据复制的功能提供了一个开箱即用的支持,从而简化了随着时间的推移不断增长的资源使用和采用。

特性
1. 所需的技能

对消息传递和底层操作系统功能的基本理解,如存储和网络通信。需要学习其他组件,如Zookeeper,Bookkeeper等。

2. 复杂性

一种简化的封装方法,其中所有功能都可以集中访问,从而降低了扩展到企业级别时的复杂性。

3. 优点
  • 性能强大,比 kafka 更稳定的低延迟、高吞吐量
  • 存储与计算分离,无需移动数据即可添加或使用 broker,运维方便
  • 分层存储,历史数据可以使用其他廉价的存储
  • 地理复制和内置 Discovery,易于将集群复制到多个区域
  • 多租户,不同的团队可以使用相同的集群并将其隔离,解决了许多管理难题
  • Function,易于部署、轻量级计算过程、对开发人员友好的 API,无需运行自己的流处理引擎
  • 多协议支持,支持 RocketMQ、AMQP、Kafka 协议等,容易与实现这些协议的中间件进行集成
  • 社区相对活跃,能够及时反馈。
4. 缺点
  • 部署比较复杂,除了 Pulsar 还有另外有Zookeeper、Bookkeeper两个组件
  • 相对缺乏支持、文档和案例
  • 插件和客户端相对 Kafka 较少
5. 总成本

Pulsar 需要花费更多的精力来启动和运行,但一旦部署,它就可以很好地扩展到企业级别。开源并不意味着免费的,企业运行 Pulsar 通常需要专门的支持人员来维护基础设施。使用ADMQ商业版,金蝶天燕可以提供完美支持和维护。

6. 性能

并发高,百万级别 TPS

延迟低,毫秒级别

可扩展性强,集群可以同时进行水平方向和垂直方向的缩放

支持分布式,内置了分布式和数据复制的本地支持

四、总结

如今,在选择消息传递通信产品时,企业比其他任何时候都面临着困难的挑战。虽然单个解决方案的总体拥有成本较低,但没有一个解决方案能够满足所有应用程序的所有需求。消息传递必须更加全面,以满足特定和不同的应用程序需求—包括高性能/低延迟事件处理、流分析的流数据、不同应用程序之间本地集成的微服务等等。

如今 ADMQ 将流处理和传统消息队列统一,加上跨地域复制、无状态broker、分层存储、多协议支持,让 ADMQ 成为一个全新的消息中间件。提供给不同企业不同需求的对应解决方案。

对比概览

对比项 RabbitMQ(AMQP) RocketMQ Kafka ADMQ
成熟度 成熟 成熟 成熟 一般
时效性 微秒级别 毫秒级别 毫秒级别 毫秒级别
请求TPS 万级别 十万级别 百万级别 百万级别
可靠性 一般 一般
可用性 高(主从构架) 高(主从构架) 非常高(分布式) 非常高(分布式)
定时消息 支持 支持(固定level) 不支持 支持
事物 支持 支持 支持 支持
副本同步策略 Master-Slave同步 Master-Slave同步 多机异步 多机异步
多租户 支持部分 支持部分 支持部分 支持
动态扩容 集群扩容依赖前端 需同步配置 需手动执行rebalance 友好,及时扩容
故障恢复 不友好 不友好 较友好 友好
消息堆积能力 影响性能 支持 支持 支持
消息回溯 不支持 支持 支持 支持
顺序消息 不支持 支持 支持 支持
安全防护 一般 一般 一般
社区活跃度
文档 一般
特点 erlang语言发开,性能一般,出现比较早,有一定的用户基数 各个环节分布式扩展设计,主从HA,多种消费模式,性能好 高吞吐量、持久化数据存储、分布式系统易于扩展,性能极好 灵活、多租户、云原生架构、跨地域复制,性能极好

LongAdder

为什么要使用Longadder?

传统的原子操作类 AtomicLong 。他的原子性是通过 CAS 原理来实现的,但是有一个问题,如果并发大了,所有请求线程只有一个线程会成功,其他的线程都需要自旋等待。而自旋等待的cpu消耗是很大的。所以在并发量大的情况下,Atomiclong的性能是比较低的

LongAdder 的原理就是,加入了分段的概念,每个线程都有属于自己的桶位,线程针对自己桶位的值 做cas计算。

LongAdder流程

base变量:非竞争条件下,直接累加到该值上

cell[]数组:竞争条件下,累加到各自线程的槽中cell[i]

竞争条件下调用get方法获取LongAdder的值,才会去统计所有cell数组中的值


jstack分析线程运行状态
**1.通过ps -ef|grep {*program*Name} 找到程序进程ID
[root@10 keep_rule_sender]# ps -ef|grep java
root 4896 18614 0 15:52 pts/0 00:00:00 grep --color=auto java
root 15550 15547 0 5月25 ? 00:24:08 ./jre/bin/java -jar -Dfile.encoding=UTF-8 -XX:+UseG1GC -Xms16384M -Xmx16384M -Xmn8192M -Xss2M AuthCenter.jar

2.top -Hp pid 查看该进程对应的各个线程的运行状态

[root@10 keep_rule_sender]# top Hp 15550
top - 15:55:19 up 115 days, 54 min, 1 user, load average: 0.31, 0.36, 0.40
Threads: 146 total, 0 running, 146 sleeping, 0 stopped, 0 zombie
%Cpu(s): 3.2 us, 0.7 sy, 0.0 ni, 96.1 id, 0.0 wa, 0.0 hi, 0.1 si, 0.0 st
KiB Mem : 13157919+total, 62099732 free, 26188756 used, 43290704 buff/cache
KiB Swap: 4194300 total, 4194300 free, 0 used. 10426378+avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
15937 root 20 0 27.453g 9.252g 13756 S 0.3 7.4 0:16.47 java
15947 root 20 0 27.453g 9.252g 13756 S 0.3 7.4 0:16.73 java

3.jstack pid > stack.txt 保存该程序各个线程当前的执行状态到本地文件stack.txt中

4.查看对应线程的状态

把top看到的线程号(pid)转换成16进制

在stack.txt文件中查看对应线程的执行状态

"nioEventLoopGroup-3-52" #78 prio=10 os_prio=0 tid=0x00007fe8b012c000 nid=0x3e41 runnable [0x00007fe7eb7fa000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000003c0056660> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000003c0056608> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000003c0056618> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:639)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:325)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)


Pulsar订阅进度同步原理

描述

Pulsar自带的跨集群数据复制,可以把一个集群收到的消息复制到其他集群上,这样消费者正在连接的集群挂掉后,可以从其他集群消费消息。

除了消息的复制,pulsar还支持订阅进度的同步,具体场景如下:

部署了三个集群A、B、C,并设置集群之间的消息两两复制。消费者A刚开始从集群A消费消息,当集群A挂掉后,消费者A切换到集群B消费消息,这时候消费者A不需要从头开始读取集群B上的消息,而是可以从上一次在集群A上消费失败的位置继续从集群B上消费消息。

实现

集群间的消息复制是通过内置生产者和消费者实现的,接收消息的集群上会启动一个消息读取进程,读取发送到该集群的消息,然后通过客户端接口发送到其他集群上。

也就是说消息的复制并不是直接把bookie中的存储文件复制到其他集群上,而是使用内置的订阅名称读取本地消息,然后发送到其他集群。这样的话,两个集群的消息元数据就不是一样的,同一条消息在多个集群的消息ID是不一样的。

所以如果想实现订阅进度的同步,需要把同一条消息在多个集群的消息ID关联上,然后在各个集群间同步上次消费完毕的消息ID,这样不同集群之间就都知道消费者上次消费确认的位置并能够移动本集群的消费位点了。

所以实现订阅进度同步的关键点在于

  • 集群间能相互通信

  • 同一条消息在多集群间的关联关系

  • 消费进度更新时能及时通知到其他集群

具体方法

Pulsar本身的消息同步并不是实时的、严格的同步,只是避免大范围的重复消费,还是会有一小段消息的重复。

Pulsar订阅进度同步的流程

  • 发布快照通知给其他集群。

  • 其他集群收到通知后,记录本集群的LAC指针并发送给开启同步流程的集群。

  • 开启流程的集群收到其他所有集群的返回消息后,分别建立其他集群LAC和本集群上一次消息ID的关联关系。

  • 开启流程的集群建立好消息的ID的关联关系后,生成一个快照消息,并插入到消息队列中。

  • 消费者返回确认后,会根据消息ID检测上一个快照,找到后就说明消费位置已经超过快照了,需要通知其他消费者更新消费位置了。

  • 其他集群收到位置更新通知后更新本集群的消费位置。

下面以某个场景详细说明下。

场景描述:

集群A、B、C三个集群互相同步消息,生产者都发送消息到集群A上。

集群A发布一个快照通知

创建Topic时会检测是否需要同步订阅进度,如果需要则通过ReplicatedSubscriptionsController启动一个定时任务,任务的功能就是发送一个通知,代码如下:

ReplicatedSubscriptionsController

private void startNewSnapshot() {
cleanupTimedOutSnapshots();
// 判断是否有新消息写入了
if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
}
return;
}

MutableBoolean anyReplicatorDisconnected = new MutableBoolean();
topic.getReplicators().forEach((cluster, replicator) -> {
if (!replicator.isConnected()) {
anyReplicatorDisconnected.setTrue();
}
});

// 当所有集群都正常时才会发送通知。
if (anyReplicatorDisconnected.isTrue()) {
// Do not attempt to create snapshot when some of the clusters are not reachable
if (log.isDebugEnabled()) {
log.debug("[{}] Do not attempt to create snapshot when some of the clusters are not reachable.",
topic.getName());
}
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Starting snapshot creation.", topic.getName());
}

pendingSnapshotsMetric.inc();
ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
pendingSnapshots.put(builder.getSnapshotId(), builder);
// 这里只是通过往该Topic写入一条消息,消息中包含通知ID。
builder.start();
}

ReplicatedSubscriptionsSnapshotBuilder

void start() {
if (log.isDebugEnabled()) {
log.debug("[{}] Starting new snapshot {} - Clusters: {}", controller.topic().getName(), snapshotId,
missingClusters);
}
startTimeMillis = clock.millis();
// 设置通知ID(快照ID)和本集群名称。
controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
}

集群B和C收到通知

集群A写入消息到该Topic后,该条消息会和其他消息一样发送到集群B和集群C上。

由于集群间是相互复制消息,所以集群B和C的复制模块能够读取到该消息,并作相应的处理。

PersistentReplicator

private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
return;
}

int markerType = msg.getMessageBuilder().getMarkerType();

// 这里确保只处理消息发送方集群发送的消息,即只处理集群A发送的消息。
if (!(msg.getMessageBuilder().hasReplicatedFrom()
&& remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) {
// Only consider markers that are coming from the same cluster that this
// replicator instance is assigned to.
// All the replicators will see all the markers, but we need to only process
// it once.
return;
}

// 需要处理的消息包含三类:通知、通知应答、消费位置更新。
switch (markerType) {
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
break;

default:
// Do nothing
}
}

收到通知后,会获取LAC,然后封装LAC,写入到该Topic中。

ReplicatedSubscriptionsController

private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
// if replicator producer is already closed, restart it to send snapshot response
Replicator replicator = topic.getReplicators().get(request.getSourceCluster());
if (!replicator.isConnected()) {
topic.startReplProducers();
}

// Send response containing the current last written message id. The response
// marker we're publishing locally and then replicating will have a higher
// message id.
// 得到集群上次写入的消息
PositionImpl lastMsgId = (PositionImpl) topic.getLastPosition();
if (log.isDebugEnabled()) {
log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
}

// 把消息ID连同通知ID一起写入到Topic中。
ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
request.getSnapshotId(),
request.getSourceCluster(),
localCluster,
lastMsgId.getLedgerId(), lastMsgId.getEntryId());
writeMarker(marker);
}

集群A建立本集群和集群B、C间消费ID的关联关系

集群B和C写入的消息会被集群B和C的复制模块发送到集群A,集群A的复制模块会读到该消息并做相应的处理。

ReplicatedSubscriptionsController

private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
String snapshotId = response.getSnapshotId();
ReplicatedSubscriptionsSnapshotBuilder builder = pendingSnapshots.get(snapshotId);
if (builder == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received late reply for timed-out snapshot {} from {}", topic.getName(), snapshotId,
response.getCluster().getCluster());
}
return;
}

builder.receivedSnapshotResponse(position, response);
}

ReplicatedSubscriptionsSnapshotBuilder

// 该Position是集群A和B返回的消息在集群A上的Position。
synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received response from {}", controller.topic().getName(),
response.getCluster().getCluster());
}
String cluster = response.getCluster().getCluster();
// Response中包含集群A/集群B中的LAC消息。
responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(response.getCluster().getMessageId()));
missingClusters.remove(cluster);

if (log.isDebugEnabled()) {
log.debug("[{}] Missing clusters {}", controller.topic().getName(), missingClusters);
}

if (!missingClusters.isEmpty()) {
// We're still waiting for more responses to come back
return;
}

// We have now received all responses
// 如果超过两个集群需要两次通知,这里没看懂为啥...
if (needTwoRounds && !firstRoundComplete) {
// Mark that 1st round is done and start a 2nd round
firstRoundComplete = true;
missingClusters.addAll(remoteClusters);

controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Snapshot is complete {}", controller.topic().getName(), snapshotId);
}
// Snapshot is now complete, store it in the local topic
// 这里是建立记录集群B/集群C的LAC消息和集群A收到集群B/集群C是保存LAC的消息的消息ID间的关联关系,而且是最后边一个消息。
// 所以这里不能实现同一条消息在三个集群间的关联。
PositionImpl p = (PositionImpl) position;
controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
p.getLedgerId(), p.getEntryId(), responses));
controller.snapshotCompleted(snapshotId);

double latencyMillis = clock.millis() - startTimeMillis;
snapshotMetric.observe(latencyMillis);
}

集群A建立好关联关系后,会生成一个快照,这个快照包含各集群消息ID的关联关系、

然后写入到该Topic中,之后这条消息就会复制到集群B和集群C上。

创建快照缓存

虽然上边的步骤已经生成快照了,但是快照的缓存并不上在上边添加的。当快照被写到Topic中,那么当消费者读取消息时就肯定能读到该快照。

当消息在Broker端被读到时,通过过滤器处理这些消息。

AbstractBaseDispatcher

public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
totalEntries++;
ByteBuf metadataAndPayload = entry.getDataBuffer();
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
? entryWrapper.get()[entryWrapperIndex].getMetadata()
: null;
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
: msgMetadata;
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
// 省略代码 ...

} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
// 这里处理读到的快照消息。
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}

entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
continue;
}

// 省略代码 ...
}
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);

try {
ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
subscription.processReplicatedSubscriptionSnapshot(snapshot);
} catch (Throwable t) {
log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
return;
}
}

PersistentSubscription


@Override
public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
}
}

ReplicatedSubscriptionSnapshotCache

public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
// 这里的本地消息ID,就是保存集群B/集群C LAC消息的消息ID,且是最新的一个。
MarkersMessageIdData msgId = snapshot.getLocalMessageId();
PositionImpl position = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId());

if (log.isDebugEnabled()) {
log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position,
snapshot.getSnapshotId());
}

snapshots.put(position, snapshot);

// Prune the cache
while (snapshots.size() > maxSnapshotToCache) {
snapshots.pollFirstEntry();
}
}

至此快照从存储中被加载到Broker内存中。

集群A更新进度

当消费者订阅消息时,快照才会被加载到Broker内存里,然后消费者返回确认消息时,会查找消息ID的上一个快照。

PersistentSubscription

@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();

// 省略代码 ...

// 当可删除位置更新了(注意不能处理单独确认的消息)
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
this.updateLastMarkDeleteAdvancedTimestamp();

// Mark delete position advance
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
.advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
if (snapshot != null) {
topic.getReplicatedSubscriptionController()
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
}
}
}

deleteTransactionMarker(properties);

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
if (dispatcher != null) {
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}
}
}

如果能找到快照,则说明需要更新进度了。

这个进度是上一次的快照进度,不是最新消费的进度。

ReplicatedSubscriptionsController

public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
snapshot.getClustersList().stream()
.map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
.collect(Collectors.toList()));
}

Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
ClusterMessageId cmid = snapshot.getClusterAt(i);
clusterIds.put(cmid.getCluster(), cmid.getMessageId());
}

ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
writeMarker(subscriptionUpdate);
}

集群B和C更新进度

更新进度的消息会被写入到Topic中,这个消息也会被复制到集群B和C。

集群B和C中的复制模块读到消息后会做如下处理:

ReplicatedSubscriptionsController

private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
MarkersMessageIdData updatedMessageId = null;
for (int i = 0, size = update.getClustersCount(); i < size; i++) {
ClusterMessageId cmid = update.getClusterAt(i);
if (localCluster.equals(cmid.getCluster())) {
updatedMessageId = cmid.getMessageId();
}
}

if (updatedMessageId == null) {
// No updates for this cluster, ignore
return;
}

// 这里的消息ID是上一次的LAC。
Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received update for subscription to {}", topic, update.getSubscriptionName(), pos);
}

// 更新消费者读取进度为上一次的LAC。
PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
if (sub != null) {
sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
} else {
// Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription",
topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos);
topic.createSubscription(update.getSubscriptionName(),
InitialPosition.Latest, true /* replicateSubscriptionState */);
}
}

可以看到,消息进度的同步是由延迟的,而且是不是精确的。

其他说明

建立不同集群间消息ID的关联关系过程,是一个不精确的过程。集群A只是发送了一个快照通知,集群B和集群C把当前LAC发送给集群A,集群A读到这个LAC时,已经有新的消息写入到集群A了。此时集群A把集群B和集群C上次的LAC和本地集群中相对靠前的消息ID建立了关联关系。

比如:

集群A收到1、2、3三条消息,发送通知给集群B和集群C。

集群B读取到3后,可能有4、5、6又发送到集群B了,此时集群B发送的LAC是6。

集群C读取到3后,可能有4、5、6、7又发送到集群C了,此时集群C发送的LAC是7。

集群A读取到6后,记录下来,等待集群C的LAC。

集群A读取到7后,可能有8、9发送到集群A了,此时存储7的消息ID变成了10。

集群A建立的关联关系是10 - 6(集群B)- 7(集群C)。

当集群A的消费者读取到10的时候,把快照缓存到Broker的内存里,读取到11的时候,发现前边有一个快照。然后记录更新通知,写到Topic里。

集群B和集群C读取到快照时,会把进度分别更新到6和7,但此时消费者其实已经读取到11了。

按照这种解释,订阅进度永远不可能完全一致,即使集群A长时间都没收到消息了,集群B和集群C的消费进度也不会和集群A一致。


fastjson和jackson冲突造成的数据解析问题

1. 版本

pulsar:2.8.0

fastjson:1.2.76

2. 问题

在pulsar项目中引入fastjson后,pulsar中某些admin api不能正常使用,会报NPE错误。

执行命令会报错:

[mac bin]$ ./pulsar-admin namespaces set-backlog-quota --limit 100M --limitTime 1111 --policy producer_request_hold sample/ns118:38:54.968 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://x.x.x.x:8080/admin/v2/namespaces/sample/ns1/backlogQuota] Failed to perform http post request: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server ErrorHTTP 500 Internal Server Error
Reason: HTTP 500 Internal Server Error[mac bin]$

查看pulsar的日志:

11:50:05.098 [pulsar-web-63-6] ERROR org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Failed to update backlog quota map for namespace sample/ns1org.apache.pulsar.metadata.api.MetadataStoreException: com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: org.apache.pulsar.common.policies.data.Policies["backlog_quota_map"]->java.util.LinkedHashMap["destination_storage"]->com.sun.proxy.$Proxy117["limitSize"])  at org.apache.pulsar.broker.resources.BaseResources.set(BaseResources.java:94) ~[org.apache.pulsar-pulsar-broker-common-2.8.0.jar:2.8.0]  at org.apache.pulsar.broker.admin.impl.NamespacesBase.internalSetBacklogQuota(NamespacesBase.java:1387) ~[pulsar-broker-2.8.0.jar:2.8.0]  at org.apache.pulsar.broker.admin.v2.Namespaces.setBacklogQuota(Namespaces.java:724) ~[pulsar-broker-2.8.0.jar:2.8.0]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_231]  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_231]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_231]  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_231]  at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:65) ~[pulsar-broker-2.8.0.jar:2.8.0]  at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1435) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1350) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:179) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:383) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231]  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]

虽然能明显看出来是json解析报错引起的空指针问题,但是不知道是哪一个步骤造成解析失败。

3. 问题解决

经过一系列的排查后,发现是fastjson自动注册provider引起的。

pulsar使用jersey框架作为restful接口的处理,同时使用jackson作为json解析框架,但是项目引入fastjson后,解析框架就会被替换成fast接送。

所以解决方法就是找到注册相关的代码,设置不自动注册。

查看fastjson中注册的代码:

// 这里继承jersey的AutoDiscoverable接口,并设置注册优先权高于jersey的默认权限,是为了优先注册fastjson的解析框架。
@Priority(AutoDiscoverable.DEFAULT_PRIORITY - 1)
public class FastJsonAutoDiscoverable implements AutoDiscoverable {
public static final String FASTJSON_AUTO_DISCOVERABLE = "fastjson.auto.discoverable";
public volatile static boolean autoDiscover = true;
static {
// 获取系统属性
try {
autoDiscover = Boolean.parseBoolean(System.getProperty(FASTJSON_AUTO_DISCOVERABLE, String.valueOf(autoDiscover)));
} catch (Throwable ex) {
//skip
}
}
public void configure(final FeatureContext context) {
final Configuration config = context.getConfiguration();
// Register FastJson. 如果设置了自动注册,且还没有注册过,则进行注册。
if (!config.isRegistered(FastJsonFeature.class) && autoDiscover) {
context.register(FastJsonFeature.class);
}
}
}

所以如果不想用fastjson,则需要设置自动注册为false:

System.setProperty(FastJsonAutoDiscoverable.FASTJSON_AUTO_DISCOVERABLE, "false");

这个是全局变量,所以需要在程序启动的时候设置,如果在jersey加载完之后在设置就没有效果了。

4. 问题排查

解决办法虽然很简单,一行代码就可以了,但是找问题却耗费了很长时间,下面记录下查找问题做的一些工作。

4.1 问题确认

出现问题的原因是我们添加了自己的业务逻辑,那么就要确认这个问题是新增代码引起的还是以前就有的bug。

所以首先需要把环境恢复成修改之前的环境,然后测试。

结果发现之前的环境没问题,所以确认出现问题的原因就是最近新加的代码。

4.2 在问题出现的地方找原因

通常情况下,对于新出现的问题,我们不可能马上明白原因是啥,不知道是之前哪一块逻辑修改造成的,尤其是代码可能不是自己写的。

所以,最简单的就是在问题出现地方的前后打日志,理清前后逻辑。

该问题出现在

org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl:


@Override
public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyFunction) {
log.info("vvv method {}, path {}", "readModifyUpdate0", path);
return executeWithRetry(() -> objCache.get(path)
.thenCompose(optEntry -> {
if (!optEntry.isPresent()) {
return FutureUtils.exception(new NotFoundException(""));
}

CacheGetResult<T> entry = optEntry.get();
T currentValue = entry.getValue();

long expectedVersion = optEntry.get().getStat().getVersion();

log.info("vvv method {}, path: {}, class: {}, value: {}, hash: {}, expectedVersion: {}, p: {}",
"readModifyUpdate", path, currentValue.getClass(), currentValue, currentValue.hashCode(),
expectedVersion, this.hashCode());

if (currentValue instanceof Policies) {
Policies p = (Policies) currentValue;
if (p.backlog_quota_map.size() > 0) {
p.backlog_quota_map.forEach((key, value) -> {
log.info("vvv method {}, path {}, key {}, value {} {}", "readModifyUpdate", path, key,
value.getClass(), value);
});
}
}


T newValueObj;
byte[] newValue;
try {
// Use clone and CAS zk to ensure thread safety
// 问题出现在这里
currentValue = serde.deserialize(serde.serialize(currentValue));
// apply方法可以拿到之前方法的值。
newValueObj = modifyFunction.apply(currentValue);
log.info("vvv method {}, path: {}, class: {}, newValue: {}", "readModifyUpdate", path,
newValueObj.getClass(), newValueObj);

newValue = serde.serialize(newValueObj);
} catch (Throwable t) {
return FutureUtils.exception(t);
}

return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(stat -> {
// Make sure we have the value cached before the operation is completed
log.info("vvv method {}, path {}", "readModifyUpdate-put", path);
objCache.put(path,
FutureUtils.value(Optional.of(new CacheGetResult<>(newValueObj, stat))));
}).thenApply(__ -> newValueObj);
}), path);
}

通过阅读该类的代码,可以发现:

  • 该类缓存了一些从zookeeper中查询的数据,缓存使用的框架是Caffeine。

  • 缓存中的数据会定时老化。

  • 该类提供了数据修改方法,接收到修改指令后会更新到zookeeper中,然后更新缓存数据。

  • 在上述方法中,接收到更新指令后,首先根据path查找缓存中是否存在,如果不存在则直接返回;如果存在,则获取数据和数据版本(对应zookeeper中的版本),把旧数据序列化后再反序列化(不知道意义是啥),然后把新数据保存到zookeeper中,保存成功后更新缓存数据。

  • 缓存初始化的方法是readValueFromStore用于从zookeeper中查询数据。

打包后替换线上环境,并看下打印的日志:

11:50:05.095 [pulsar-web-63-6] INFO  org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl - vvv method readModifyUpdate, path: /admin/policies/sample/ns1, class: class org.apache.pulsar.common.policies.data.Policies, value: Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={destination_storage={}}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=null, subscription_expiration_time_minutes=0, retention_policies=null, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=Full, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=true, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null), hash: -2056889852, expectedVersion: 0, p: 1890262240

可以看到backlog_quota_map={destination_storage={}}中destination_storage是一个空值,所以当序列化的时候会抛出NPE异常。

正常情况下的日志应该是:

10:49:44.719 [metadata-store-32-1] INFO  org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl - vvv method readValueFromStore, path /admin/policies/sample/ns1, class class org.apache.pulsar.common.policies.data.Policies, value Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={destination_storage=BacklogQuotaImpl(limitSize=104857600, limitTime=1111, policy=producer_request_hold)}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=null, subscription_expiration_time_minutes=0, retention_policies=null, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=Full, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=true, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null), hash 1614905576, version: 1, p: 1843660571

destination_storage是有内容的,内容是:

BacklogQuotaImpl(limitSize=104857600, limitTime=1111, policy=producer_request_hold)。

由于这里的数据是从缓存中获取的,所以怀疑从缓存中拿到数据就是空,但是在readValueFromStore方法中打印日志后,发现这里的数据并不是空,而且内容和readModifyUpdate中不一样。所以就能确认在其他地方修改了缓存中的数据

4.3 查看调用链

通过debug可以确认调用链是:

  • org.apache.pursar.broker.admin.v2.Namespaces -> setBacklogQuota

  • org.apache.pursar.broker.admin.impl.NamespaceBase -> internalSetBacklogQuota

  • org.apache.pulsar.broker.resources -> set

  • org.apache.pulsar.broker.resources -> setAsync

  • org.apache.pulsar.metadata.cache.impl -> readModifyUpdate

第一个方法就是提供了http接口,接收客户端传递的参数。

public void setBacklogQuota(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
@ApiParam(value = "Backlog quota for all topics of the specified namespace")
BacklogQuota backlogQuota) {
validateNamespaceName(tenant, namespace);
log.info("vvv method {}, backlogQuota {}, class {}", "setBacklogQuota", backlogQuota, backlogQuota.getClass());
internalSetBacklogQuota(backlogQuotaType, backlogQuota);
}

此处打印的日志如下:

11:50:05.094 [pulsar-web-63-6] INFO  org.apache.pulsar.broker.admin.v2.Namespaces - vvv method setBacklogQuota, backlogQuota {}, class com.sun.proxy.$Proxy117 {}

第二个方法:

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
try {
final String path = path(POLICIES, namespaceName.toString());
// 从MetaCacheImpl中获取缓存中的数据。
Policies policies = namespaceResources().get(path)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist"));
RetentionPolicies r = policies.retention_policies;
if (r != null) {
Policies p = new Policies();
p.backlog_quota_map.put(quotaType, backlogQuota);
if (!checkQuotas(p, r)) {
log.warn(
"[{}] Failed to update backlog configuration"
+ " for namespace {}: conflicts with retention quota",
clientAppId(), namespaceName);
new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
}
// 修改缓存中的数据(这里是直接修改缓存中的数据,所以在之前的日志中,
// 我们发现readModifyUpdate方法中拿到的缓存对象的内容和缓存加载方法readValueFromStore中的不一致)。
policies.backlog_quota_map.put(quotaType, backlogQuota);
log.info("vvv method {}, path {}, policies {}", "internalSetBacklogQuota", path, policies);
namespaceResources().set(path, p -> policies);
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName,
jsonMapper().writeValueAsString(backlogQuota));

} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

这个方法中,先是从MetaCacheImple对象中根据path拿到缓存中的policies,然后更新该policies,然后再通过MetaCacheImpl的update方法更新zookeeper中的数据。

通过上述流程,我们可以确定的是,缓存从zookeeper中加载的数据是正确的,客户端调用命令后传递的数据是错误的,所以定位到问题出现在调用链的第一个方法中。

4.4 问题分析

从前面的过程中,我们就能大致确认问题是客户端传递的参数没有被正常解析或者客户端传递的参数为空,然后pulsar本身使用jersey作为restful框架,使用jackson解析请求中的json数据,并对请求参数进行赋值。

首先通过tcpdump捕获交互过程中的数据包,确定客户端发送的数据中是有内容的:

image-20231007103138364

可以看到传递的内容不为空,然后看不报错情况下的数据报文:

image-20231007103221655

由此就可以确定不是客户端的问题,而是broker端接收到数据后的解析问题。

结合以往经验,推测是不同的json处理框架有冲突,导致数据解析失败。把fastjson去掉后,果然问题不再出现!

4.5 问题解决

既然是fastjson的问题,那么推测是加载fastjson包后,会在某些情况下替换jackson进行json数据的解析,导致解析出错。

pulsar是基于jersey作为restful框架的,而jersey遵循了javax-ws规范,规范中定义了加载哪些数据解析类,即定义了加载哪一个json框架作为解析框架。

搜索关键词:fastjson和jersey冲突,发现fastjson项目中的一个issue:

https://github.com/alibaba/fastjson/issues/1392

有人遇到了相似的问题,fastjson进行了修复。

我们的问题与上述问题还是有些不同的,但本质一样,都是由于fastjson默认加载了自己的作为java-ws的provider,导致jersey不能加载jackson。

而fastjson又不够强大,如果参数中的变量类型是interface,fastjson不能找到该interface的实现类并赋值,所以我们看到setBacklogQuota方法中backlogQuota是空的,而且class是com.sun.proxy.$Proxy117。

如果我们把BacklogQuota换成backlogQuotaImpl或者我们自己写的一个包含三个参数的类,则能成功赋值,例如这样:


@Data
@AllArgsConstructor
@NoArgsConstructor
public class VVData {
private long limitSize;
// backlog quota by time in second
private int limitTime;
private BacklogQuota.RetentionPolicy policy;
}

打印结果:

12:14:01.371 [pulsar-web-63-12] INFO  org.apache.pulsar.broker.admin.v2.Namespaces - vvv method setBacklogQuota, backlogQuota VVData(limitSize=104857600, limitTime=1111, policy=producer_request_hold

5. 疑问

上边问题已经解决了,但是在找问题的过程中,发现可以通过其他地方修改Caffeine缓存中的数据。

这里在多线程同时操作情况下有出现数据不一致的问题吧

fastjson不会自动把自己设置为java-ws的provider,应该是jersey在启动的时候查找AutoDiscoverable接口的所有实现类,然后根据实现类里面设置的优先级加载优先级最高的provider,或者按照优先级顺序加载所有的provider。AutoDiscoverable的代码如下:

package org.glassfish.jersey.internal.spi;
import javax.ws.rs.core.FeatureContext;
/** * A service provider contract for JAX-RS and Jersey components that need to be automatically discovered and registered in * {@link javax.ws.rs.core.Configuration runtime configurations}. * <p/> * A component implementing this contract becomes auto-discoverable by adding a new entry with fully qualified name of its * implementation class name to a {@code org.glassfish.jersey.internal.spi.AutoDiscoverable} file in the {@code * META-INF/services} directory. * <p/> * Almost all Jersey {@code AutoDiscoverable} implementations have * {@link #DEFAULT_PRIORITY} {@link javax.annotation.Priority priority} set. * * @author Michal Gajdos */public interface AutoDiscoverable {
/** * Default common priority of Jersey build-in auto-discoverables. * Use lower number on your {@code AutoDiscoverable} implementation to run it before Jersey auto-discoverables * and vice versa. * 优先级,在fastjson中设置的优先级是 (DEFAULT_PRIORITY - 1),所以会优先加载。 */ public static final int DEFAULT_PRIORITY = 2000;
/** * A call-back method called when an auto-discoverable component is to be configured in a given runtime configuration scope. * <p> * Note that as with {@link javax.ws.rs.core.Feature JAX-RS features}, before registering new JAX-RS components in a * given configurable context, an auto-discoverable component should verify that newly registered components are not * already registered in the configurable context. * </p> * * @param context configurable context in which the auto-discoverable should be configured. */ public void configure(FeatureContext context);}

只读broker设计(一)

1. 初衷

pulsar中每一个topic必须由唯一的一个broker负责读和写,这样在保证消息的顺序和事务方面就很方便。但是如果出现如下场景:

  • 消费业务分布在多个地区
  • 某一个topic的消费业务特别多

单个broker处理请求就显得不能满足需求了。

因此,就可以设计一种只读broker,类似zookeeper中的observer,该种角色的broker只负责消息的读,不负责消息的写入。这样既能满足异地消费时从本地存储中拉取数据,又能横向扩展单topic多业务消费能力。

2. 解析

在pulsar的架构中,消息的处理和存储是分离的,producer和consumer和broker进行数据的交互,broker收到数据后选择对应的bookie节点进行读写。如下图:

image-20231008234840919

这种计算和存储分离的结构,虽然一定程度上会增大消息延迟,但是对于系统的整体可用性以及资源的平均分配有很大好处。

也因此在pulsar中,broker是无状态的,broker之间不需要进行信息的同步,不需要感知其他broker的状态。当一个consumer/producer连接到broker后,broker只需要通过zk判断是否应该由自己负责topic消息的读写即可(pulsar正常工作的前提也是zk是正常的)。

pulsar中每一个topic消息的读写都是由唯一的一个broker负责的(虽然有分区topic,但本质上还是拆分成独立的topic了),这样在保证消息顺序、事务、消息消费进度等方面都会变得很方便。但是就像开头说的那样,如果一个topic的消息成千上万的订阅者都需要,这样就会因broker单点造成性能的问题。

在开始实现只读功能之前,根据以往的经验,可以想到有以下几个问题需要关注:

  • 只读broker怎么获取到某个topic写的进度,怎么能实时感知。

  • 只读broker怎么保存某个subscription的消费进度。

  • 只读broker之间是否需要同步消费进度,即是否支持一个subscription从多个broker订阅。

  • 写broker是否需要消息消费进度。

  • 怎么删除已消费完的消息(之前是有写broker判断如果所有的subscription都消费完成了,则删除消息,现在写broker不知道各个只读broker的消费进度)。

  • 只读broker是单独部署还是说通过消费者发送的标识判断是否是以只读方式读取。

  • consumer连接到一个只读broker消费一定量消息后断开连接,连接到其他的只读broker,那么其他只读broker怎么获取到上次消费进度。

针对上述问题去pulsar源码中找答案,然后找一个折中的方案实现只读broker功能,首先我们先看消息订阅涉及到的几个阶段。

2.1 lookup

该阶段是为了查找topic属于哪一个broker负责,consumer传递的信息只有topic名称。

broker收到请求后,会依次执行以下逻辑:

  • 校验topic是否合法

  • 是否超过了设置的同时进行lookup的数量

  • 是否通过代理方式连接(代理方式连接需要单独校验权限)

  • topic操作是否被授权

  • 校验集群信息、权限信息

  • 计算topic属于的bundle,然后根据bundle判断属于哪一个broker,并返回broker的地址

如果实现只读broker,我们就需要在这里添加判断逻辑,返回我们给定的broker地址。

2.2 subscribe

该阶段是发送订阅请求,broker可以拿到subName。

收到订阅请求后会执行以下逻辑:

  • 权限验证

  • 客户端信息是否已被保存(broker是通过客户端生成的id来区分不同的consumer的),如果有则直接返回

  • 到BrokerService中查找该subName的topic信息,如果有则复用(topic信息由单一实例存储,例如PersistentTopic);如果没有则创建,然后会依次打开topic对应的ledger、ledger下的cursor。在这个过程中会进行topic归属broker的校验。

  • 进行schema的处理

  • 创建Subscription实例,保存订阅信息,并建立Consumer和Subscription之间的关系

  • 返回订阅成功给consumer

在此阶段中,由于在创建Topic实例的同时会打开ledger和cursor,而ledger是默认以写的方式打开新的ledger的,所以在只读broker中,需要在这个阶段添加判断逻辑,以只读的方式打卡ledger,并且不会创建新的写ledger。

2.3 ledger和cursor恢复

该阶段是通过zk中记录的数据进行topic以往ledger和cursor进度的恢复。分别对应zk中的path节点:

/managed-ledgers/tenant_c/ns1/persistent/topic01

/managed-ledgers/tenant_c/ns1/persistent/topic01/consumer_00

这里需要注意的就是怎么同步ledger的LAC到cursor中,并实时感知topic下新的data ledger的创建以及获取最新的LAC信息并同步给cursor。

2.4 客户端断开连接

该阶段会进行资源的清理,包括取消订阅信息、关闭consumer等。

这里需要关注的就是要把消费进度持久化到zk中(默认不是实时刷新到zk的,断开连接后也不会及时持久化,虽然会实时写入bk中)。然后就是需要等一个topic下所有的sub都断开连接了,才进行消费进度的保存。

3. 思路

通过不断尝试,最终确定实现方式如下

  • 设置指定的broker为只读broker,只读broker不提供写功能,并且不会启动写的功能,比如创建topic。

  • 一个subscription只能从唯一的一个broker读取消息,之间的关系会写到zk上,broker收到subscribe请求后会进行判断;连接断开后会删除zk中的关系。

  • broker在consumer发送subscribe后会从zk/bk中拉取最新的消费进度,consumer断开连接后会把进度回写到zk中。这样就避免了只读broker之间同步进度。

  • 只读broker在定时器中获取data ledger的LAC(对于closed状态的ledger,通过getLastAddConfirmed获取;对于open状态的ledger,通过readLastAddConfirmed)。当获取的LAC比记录的大时,会通过notifyCursors和notifyWaitingEntryCallBacks方法触发读操作。这种方式会造成一定的延迟,但也是可接受的。。。

  • 当一个topic的所有subscription都断开时,会关闭cursor,触发进度的持久化。

  • 当cursor恢复时,会查询zk中是否存在记录,如果存在则通过zk中的数据恢复进度(由于默认情况下cursor的进度都是在ManagedLedger实例第一次创建的时候恢复的,中间有新的cursor创建就不会有恢复流程了)。

  • 只读broker和读写broker共用一套zk环境,但是只是往里面写入subscription的消费进度。

通过上述方式,可以实现一个简单的只读broker。

后续需要改进的地方包括

  • 消费进度实时感知(当前情况下不能感知到最新一条消息,可能存在刷新延迟)。

  • 支持多topic(目前没有测试这个,估计会有问题)。

  • 支持分区topic(同上)。

  • 延迟消息(目前测试有问题)。

  • 只读broker模式下ManagedLedger的一些逻辑被注释掉了,没看懂具体功能,后续还需要继续研究。


记一次CompletableFuture引起的死锁问题

1. 背景

在pulsar中添加只读broker功能的支持,所以修改了pulsar-broker的代码,在创建cursor时添加了zk查询代码。

然后测试的时候发现,每一次重新创建topic,consumer开始订阅消息后,broker就会出现阻塞,导致subscribe阶段一直不成功。

2. 问题分析

consumer和pulsar之间进行交互的阶段有:

  • CONNECT

  • PARTITIONED_METADATA

  • LOOKUP

  • SUBSCRIBE

然后当consumer发送SUBSCRIBE后,一直收不到broker的应答,由此确认broker处理SUBSCRIBE出现问题。

2.1 定位代码

通过查看pulsar的源码,可以看到对于zk数据的查询都是异步执行的,而且共用的一个线程,如下:

// AbstractMetadataStore 构造方法
protected AbstractMetadataStore() {
this.executor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("metadata-store"));
registerListener(this);
// other code
}

// 执行查询或者回调函数
/**
* Run the task in the executor thread and fail the future if the executor is shutting down
*/
protected void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {
future.completeExceptionally(t);
}
}

所以猜测是由于回调函数没有执行完毕导致的阻塞,因此在所有的回调函数出打印日志,定位到如下代码出现阻塞:

@Override
public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();

CompletableFuture<Stat> future = new CompletableFuture<>();

try {
if (hasVersion && expectedVersion == -1) {
CreateMode createMode = getCreateMode(options);
ZkUtils.asyncCreateFullPathOptimistic(zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
createMode, (rc, path1, ctx, name) -> {
execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
// 运行到此处时出现阻塞。
future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true));
} else if (code == Code.NODEEXISTS) {
// We're emulating a request to create node, so the version is invalid
future.completeExceptionally(getException(Code.BADVERSION, path));
} else {
future.completeExceptionally(getException(code, path));
}
}, future);
}, null);
} else {
// other code...
}
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}

return future;
}

2.2 查看进程和cpu使用情况

然后查询进程和cpu使用情况:

# 找到进程ID

ps -ef |grep broker

# 查询进程cpu

top -Hp 16704

image-20240519001339974

看到cpu使用的并不多,再10%以下,由此可推断不是gc或者其他死循环造成的阻塞。

2.3 查看堆栈信息

"metadata-store-7-1" #84 prio=5 os_prio=0 tid=0x00007f1560004800 nid=0x1ca1 waiting on condition [0x00007f1506a2c000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fd0c3d18> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.recoverCursorFromZK(ManagedLedgerImpl.java:1072)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:1019)
// 阻塞原因
- locked <0x00000000f5841ce8> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:857)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:747)
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:1050)
at org.apache.pulsar.broker.service.ServerCnx$$Lambda$277/1382611823.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$$Lambda$295/640790180.accept(Unknown Source)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
// 阻塞代码
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$null$15(ZKMetadataStore.java:234)
at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$127/153703270.run(Unknown Source)
at org.apache.pulsar.metadata.impl.AbstractMetadataStore.lambda$execute$8(AbstractMetadataStore.java:261)
at org.apache.pulsar.metadata.impl.AbstractMetadataStore$$Lambda$128/1782705605.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

"main-EventThread" #82 daemon prio=5 os_prio=0 tid=0x00007f15edd8e800 nid=0x1ca0 waiting on condition [0x00007f150692c000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000f5840660> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQue
uedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:559)

"main-SendThread(172.20.140.23:2181)" #81 daemon prio=5 os_prio=0 tid=0x00007f15edd8a800 nid=0x1c9f runnable [0x00007f150672a000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f571b3e8> (a sun.nio.ch.Util$3)
- locked <0x00000000f571b3f8> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f571b3a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:332)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)

可以看到zk中代码出现阻塞的原因是和ManagedLedgerImpl中的代码出现了资源竞争,涉及到的代码如下:

private boolean recoverCursorFromZK(String cursorName, final OpenCursorCallback callback, final Object ctx) {
CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
store.asyncGetCursorInfo(name, cursorName, new MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info, Stat stat) {
existsFuture.complete(true);
}

@Override
public void operationFailed(MetaStoreException e) {
log.info("vvv method {} open failed", "recoverCursorFromZK");
existsFuture.complete(false);
}
});

try {
// 阻塞的原因是因为这条语句
if (!existsFuture.get()) {
return false;
}
} catch (Exception e) {
log.error("vvv method recoverCursorFromZK error", e);
return false;
}

// other code

return true;
}

3. 问题解决

修改recoverCursorFromZK方法,把通过get进行任务执行完毕的判断改为confuture.thenAccept()。

4. 问题总结

其实最后也没发现为什么出现资源竞争,好像就是在一个线程中使用CompletableFuture就出现了问题。

整个代码的逻辑是:

  • 通过zk查询数据,设置回调方法等待zk查询完成后执行。

  • zk查询完成后,通过唯一的线程执行回调方法,回调方法中就是future.complete()。

  • 另外的线程开始执行同样的查询,回调方法同样是future.complete(),只不过还有其他线程调用了future.get()。

流程就像下面这种:

public void testFuture() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();

CompletableFuture<Boolean> f1 = new CompletableFuture<>();

new Thread(() -> {
CompletableFuture<Boolean> f2 = new CompletableFuture<>();
executor.execute(() -> {
sleep(1000);

System.out.println("f2 start");
f2.complete(true);
System.out.println("f2 done");
});

executor.execute(() -> {
sleep(1000);

System.out.println("f1 start");
f1.complete(true);
System.out.println("f1 done");
});
}).start();

System.out.println("init done");
f1.get();
System.out.println("run done");
}

private void sleep(int i) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}

pulsar消费进度保存(二)

本文只针对persistent topic进行说明。

1. 消息消费流程

consumer和broker之间是通过TCP进行数据交互的,通信框架基于netty,数据格式是TLV结构,分为不包含负载的简单消息和包含负载的消息。

具体参考:https://pulsar.apache.org/docs/en/develop-binary-protocol/

1.1 简单消息(Simple commands)

名称 长度 含义
totalSize 4字节 消息总长度,不包含该字段。(单个消息的长度最长为5M)
commandSize 4字节 序列化消息的长度
message 序列化消息

1.2 内容消息(Payload Message)

这种类型的消息主要用于发布和传输消息。

其中message字段并不是用户自定义的消息内容,而是pulsar进行通信时内定的各种消息,已有的消息类型参考:

https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto

pulsar中的消息以前是基于google protobuf进行序列化的,之后为了节约空间和减少代码量(protobuf会生成很多代码…)使用lightproto(https://github.com/splunk/lightproto)进行序列化和反序列化。

名称 长度 含义
totalSize 4字节 消息总长度,不包含该字段。(单个消息的长度最长为5M)
commandSize 4字节 序列化消息的长度
message 序列化消息
magicNumber 2字节 用于校验消息,固定为0x0e01。有此标识的话就会进行CRC校验。
checksum 4字节 用于校验消息完整性。对该字段后边的消息进行CRC32-C计算。具体参考:1.2.1 CRC校验
metadataSize 4字节 元数据大小
metaData 元数据内容
payload 其他内容(用户消息)

1.2.1 CRC校验

从netty接收到类型为Message的消息后,会调用handleMessage方法,然后调用consumer的messageReceived方法,在此处进行CRC校验。

调用过程如下(只是列举了方法内的主要逻辑,代码并不完整):

// PulsarDecoder -> channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
this.proxyMessage = proxyMessage;
proxyMessage.release();
return;
}
// Get a buffer that contains the full frame
ByteBuf buffer = (ByteBuf) msg;
try {
// De-serialize the command
int cmdSize = (int) buffer.readUnsignedInt();
cmd.parseFrom(buffer, cmdSize);
log.info("vvv_msg_type " + cmd.getType());
switch (cmd.getType()) {
case MESSAGE: {
checkArgument(cmd.hasMessage());
handleMessage(cmd.getMessage(), buffer);
break;
}
}
}
}
// ClientCnx -> handleMessage
@Override
protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayload) {
checkArgument(state == State.Ready);
if (log.isDebugEnabled()) {
log.debug("{} Received a message from the server: {}", ctx.channel(), cmdMessage);
}
ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
if (consumer != null) {
List<Long> ackSets = Collections.emptyList();
if (cmdMessage.getAckSetsCount() > 0) {
ackSets = new ArrayList<>(cmdMessage.getAckSetsCount());
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSets.add(cmdMessage.getAckSetAt(i));
}
}
consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), ackSets, headersAndPayload, this);
}
}
// ConsumerImpl -> messageReceived
void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
}
log.info("headersAndPayload=" + headersAndPayload.readableBytes());
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
MessageMetadata msgMetadata;
try {
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
}
// ConsumerImpl -> verifyChecksum
private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) {
if (hasChecksum(headersAndPayload)) {
int checksum = Commands.readChecksum(headersAndPayload);
int computedChecksum = Crc32cIntChecksum.computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
"[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
Long.toHexString(checksum), Integer.toHexString(computedChecksum));
return false;
}
}
return true;
}
// Commands -> hasChecksum
public static boolean hasChecksum(ByteBuf buffer) {
// magicCrc32c == 0x0e01
return buffer.getShort(buffer.readerIndex()) == magicCrc32c;
}

1.3 消息订阅和确认过程

consumer和broker的交互是基于netty进行的,业务处理handler分别是ClientCnx和ServerCnx。

通过查看代码可以了解到consumer连接到broker,消费消息并返回确认的过程中会依次发送以下几种类型的消息:

图片

CONNECT:建立TCP连接,broker端确认client的权限,确认成功后返回CONNECTED。

PATITIONED_METADATA:获取分区数据。由于分区实际上是通过虚拟topic实现的,所有在此阶段可以获取分区topic信息。

LOOKUP:通过topic名称查找消息是哪一个broker处理的,返回broker的地址。

SUBSCRIBE:consumer向返回的broker地址建立连接,发送topic、subscriptionName等信息,开始订阅消息。

FLOW:consumer端发送消息获取请求,并告知broker自己可以接收的最大消息数量。

MESSAGE:broker收到flow请求后,会从缓存或者bk中拉取最新的消息,在消息完成封装后发送给consumer。

ACK:consumer接收并处理完消息后,发送消息消费确认通知(已消费消息的messageId)。

CLOSE_CONSUMER:发送连接断开请求。

2. 订阅进度持久化

  • 每一个topic的消息都是由唯一的一个broker负责写入的,所以能比较容易保证每条消息的ID是唯一的,这个ID也是pulsar消息的消费和确认标识。

一个ID包含如下内容:

ledgerId:bk中每一个文件对应一个ledgerId,由bk维护。

entryId:在每一个ledger中递增,由bk维护。

partitionId:分区ID,由broker维护。

  • 在pulsar中,ManagedLedgerImpl用于用户消息的写入和读取,ManagedCursorImpl用于topic下每一个subscription的消费进度维护。

  • consumer收到broker发送的消息后,发送确认信息(messageId)给broker,broker收到后查找该subscription对应的ledger,如果存在则通过ledger写入到bk中;

    如果不存在,则创建一个ledger并把ledger信息写入到zk中,然后通过创建后的ledger写入bk中。

  • zk中写入的数据包含cursorLedgerId、deletedLedgerId、deletedEntryId等信息,如下:

cursorsLedgerId=-1, markDeleteLedgerId=603, markDeleteEntryId=15, lastActive=1629692232349
upper ledgerId=603, upper entryId=18, lower ledgerId=603, lower entryId=16
upper ledgerId=37740, upper entryId=2, lower ledgerId=37740, lower entryId=1
upper ledgerId=37740, upper entryId=4, lower ledgerId=37740, lower entryId=3

uppper和lower用于处理不连续确认的情况。

而且,只有最新一个ledger的最新一个entry信息是有效的,broker初始化cursor信息时只会加载最新ledger的最近一条确认消息的entry中最新一条数据。

  • bk中存储的数据内容:
lowLedgerId=603, lowEntryId=16, upperLedgerId=603, upperEntryId=18
lowLedgerId=37740, lowEntryId=1, upperLedgerId=37740, upperEntryId=2
lowLedgerId=37740, lowEntryId=3, upperLedgerId=37740, upperEntryId=4
lowLedgerId=37740, lowEntryId=5, upperLedgerId=37740, upperEntryId=6

可以看到bk中数据主要记录确认情况,zk中记录确认情况在哪一个ledger以及哪些数据被标记为可删除了(被成功消费了)。

  • 对于不连续确认的消息,会记录消息确认范围信息。从low - upper之前的所有消息都被消费成功了(左开右闭)。

  • 因broker停掉或者其他原因导致cursor被关闭后,会把进度持久化到zk或者bk中。满足以下条件会保存到bk中:


private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& config.getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > config.getMaxUnackedRangesToPersistInZk();
}

2.1 zk中消费进度解析

当客户端连接后,要获取该subscription的cursor信息,先从zk中查询,path如下:

/managed-ledgers/{tenant}/{namespace}/persistent/{topic}/{subscriptionName}

public void testZKMetaDataCursorInfo() throws Exception {
String path = "/managed-ledgers/tenant_c/ns1/persistent/topic_cursor/consumer_002";
byte[] bytes = getValue(path);
if (bytes == null) {
System.out.println("bytes is null");
return;
}
MLDataFormats.ManagedCursorInfo info = MLDataFormats.ManagedCursorInfo.parseFrom(bytes);
StringBuilder sb = new StringBuilder();
sb.setLength(0);
sb.append("cursorsLedgerId=").append(info.getCursorsLedgerId());
sb.append(", markDeleteLedgerId=").append(info.getMarkDeleteLedgerId());
sb.append(", markDeleteEntryId=").append(info.getMarkDeleteEntryId());
sb.append(", lastActive=").append(info.getLastActive());
if (info.getIndividualDeletedMessagesCount() > 0) {
for (MLDataFormats.MessageRange range : info.getIndividualDeletedMessagesList()) {
sb.append("\n");
sb.append(" upper ledgerId=").append(range.getUpperEndpoint().getLedgerId());
sb.append(", upper entryId=").append(range.getUpperEndpoint().getEntryId());
sb.append(", lower ledgerId=").append(range.getLowerEndpoint().getLedgerId());
sb.append(", lower entryId=").append(range.getLowerEndpoint().getEntryId());
}
}
System.out.println("debug vv " + sb);
System.out.println("done");
}

2.2 bk中消费进度解析

如果从zk中查询的数据中,cursorLedger不等于-1,则会从bk中查询该ledger的数据,恢复进度。

private void parsePulsarCursor(StringBuilder sb, LedgerEntry entry) throws Exception {
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntryBytes());
// sb.append(", ledgerId=").append(positionInfo.getLedgerId());
// sb.append(", entryId=").append(positionInfo.getEntryId());
PositionImpl position = new PositionImpl(positionInfo);
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
positionInfo.getIndividualDeletedMessagesList().forEach(messageRange -> {
sb.setLength(0);
MLDataFormats.NestedPositionInfo point = messageRange.getLowerEndpoint();
sb.append(", lowLedgerId=").append(point.getLedgerId());
sb.append(", lowEntryId=").append(point.getEntryId());
point = messageRange.getUpperEndpoint();
sb.append(", upperLedgerId=").append(point.getLedgerId());
sb.append(", upperEntryId=").append(point.getEntryId());
log.info(sb.toString());
});
} else {
log.info("-----------------");
}
if (positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
positionInfo.getBatchedEntryDeletionIndexInfoList().forEach(batchDeletedIndexInfo -> {
sb.setLength(0);
if (batchDeletedIndexInfo.getDeleteSetCount() > 0) {
for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) {
long indexId = batchDeletedIndexInfo.getDeleteSetList().get(i);
sb.append(", indexId=").append(indexId);
}
}
log.info(sb.toString());
});
}
log.info("=================");
}

3. 订阅进度恢复流程

订阅进度保存在zk和bk中,这些信息会在broker加载topic信息的时候被恢复。

当producer和consumer连接到broker时,都会调用BrokerService的getTopic方法(分别在ServerCnx的handleProducer和handleSubscribe方法)。

由于brokerService是单例的,所以consumer和producer共用一组topic信息,然后通过topic实例关联ledger、cursor等信息。

在BrokerService中,topic实例创建过程:

  • 是否已经创建,如果是则直接返回。
  • 该broker是否有topic的拥有权,如果没有则返回异常。
  • 对创建过程进行加锁,获取到锁后开始创建topic实例(同一时刻,一个topic仅有一个创建任务),由如下参数控制并发创建数量:
return topics.computeIfAbsent(topic, (topicName) -> {
return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
});
# Max number of concurrent topic loading request broker allows to control number of zk-operations
# 限制获取topic信息时,zk并发操作数量
maxConcurrentTopicLoadRequest=5000

  • 在后续每一个流程中都会校验该topic是否由该broker负责。
  • 判断ns中topic数量是否超过最大值,如果是则返回。
  • 打开ledger。
// BrokerService -> createPersistentTopic
// Once we have the configuration, we can proceed with the async open operation
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new OpenLedgerCallback())...
  • 初始化ledger、bookeeper、cursor。
  • 从zk中获取ledger信息。
// ManagedLedgerImpl -> synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx)
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback<ManagedLedgerInfo>() {

从zk中获取cursor信息。

// ManagedLedgerImpl -> private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback)store.getCursors(name, new MetaStoreCallback<List<String>>() {...

恢复cursor内容。

void recover(final VoidCallback callback) {
// Read the meta-data ledgerId from the store
log.info("[{}] Recovering from bookkeeper ledger cursor: {}", ledger.getName(), name);
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
// 如果zk中的cursorLedger是-1,则说明不需要从ledger中查询订阅进度,直接从zk中加载进度信息即可。
if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
PositionImpl recoveredPosition = new PositionImpl(info.getMarkDeleteLedgerId(),
info.getMarkDeleteEntryId());
if (info.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesList());
}
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (info.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = Maps.newHashMap();
for (int i = 0; i < info.getPropertiesCount(); i++) {
LongProperty property = info.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
recoveredCursor(recoveredPosition, recoveredProperties, null);
callback.operationComplete();
} else {
// 需要从bk中加载消费进度信息。
// Need to proceed and read the last entry in the specified ledger to find out the last position
log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
info.getCursorsLedgerId());
recoverFromLedger(info, callback);
}
}
@Override
public void operationFailed(MetaStoreException e) {
callback.operationFailed(e);
}
});
}

  • 从ledger中加载进度信息。
protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallback callback) {
// Read the acknowledged position from the metadata ledger, then create
// a new ledger and write the position into it
ledger.mbean.startCursorLedgerOpenOp();
long ledgerId = info.getCursorsLedgerId();
OpenCallback openCallback = (rc, lh, ctx) -> {
...
// Read the last entry in the ledger
// 读取最新一个位置
long lastEntryInLedger = lh.getLastAddConfirmed();

...

lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
...

// 读取最后写入ledger的entry
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(new ManagedLedgerException(e));
return;
}
// 加载属性信息
Map<String, Long> recoveredProperties = Collections.emptyMap();
if (positionInfo.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = Maps.newHashMap();
for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
LongProperty property = positionInfo.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
PositionImpl position = new PositionImpl(positionInfo);
// 如果有单独确认的消息(为了应对不是连续确认的情况)。
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, lh);
callback.operationComplete();
}, null);
};
// 打开一个新的ledger,并把进度信息写入新ledger中。
try {
bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
} catch (Throwable t) {
log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
ledger.getName(), ledgerId, name, t);
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null);
}
}
  • 从ledger中加载单独确认的消息。

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
lock.writeLock().lock();
try {
individualDeletedMessages.clear();
individualDeletedMessagesList.forEach(messageRange -> {
MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
// 已确认的消息都在一个ledger内。
if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
} else {
// 已确认的消息在不同的ledger内,这时候需要加载每个ledger中已确认的消息。
// Store message ranges after splitting them by ledger ID
LedgerInfo lowerEndpointLedgerInfo = ledger.getLedgersInfo().get(lowerEndpoint.getLedgerId());
if (lowerEndpointLedgerInfo != null) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
lowerEndpoint.getLedgerId(), lowerEndpointLedgerInfo.getEntries() - 1);
} else {
log.warn("[{}][{}] No ledger info of lower endpoint {}:{}", ledger.getName(), name,
lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId());
}
// 加载顺序是:
// (lowerLedgerId, lowerEntryId) -> (lowerLedgerId, -1) 第一条
// (lowerLedgerId, -1) -> (lowerLedgerId, -1) 中间部分
// (upperLedgerId, -1) -> (upperLedgerId, upperEntryId) 最后一条
for (LedgerInfo li : ledger.getLedgersInfo()
.subMap(lowerEndpoint.getLedgerId(), false, upperEndpoint.getLedgerId(), false).values()) {
individualDeletedMessages.addOpenClosed(li.getLedgerId(), -1, li.getLedgerId(),
li.getEntries() - 1);
}
individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1,
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
}
});
} finally {
lock.writeLock().unlock();
}
}
  • 进度恢复完毕。

pulsar消费进度研究(一)

在pulsar中,使用ManagedCursorImpl来记录每个订阅的消息消费进度信息,每个cursor对应一个subscription。cursor的信息会记录到zookeeper中,同时pulsar也会把ack信息持久化到bookkeeper中。

通过阅读代码,了解到以下内容:

1.从zk中/managed-ledgers/tenant_c/ns1/persistent/storeV3/consumer_002中查询cursor信息,保存在对象ManagedCursorInfo中,主要包含字段:


message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
required int64 cursorsLedgerId = 1;
// Last snapshot of the mark-delete position
optional int64 markDeleteLedgerId = 2;
optional int64 markDeleteEntryId = 3;
repeated MessageRange individualDeletedMessages = 4;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 5;
optional int64 lastActive = 6;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}

2.如果cursorsLedgerId字段为-1,表示记录确认信息的ledger已经被关闭,消费进度是markDeleteLedgerId和markDeleteEntryId字段。

3.如果cursorsLedgerId存在,则从记录确认消息的ledger中读取内容。

4.从ledger中获取最开始一次添加并确认的消息ID。bk中存储的内容格式如下:

message PositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
repeated MessageRange individualDeletedMessages = 3;
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 4;
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
}

5.如果有单独确认的消息,会读取内容并保存到individualDeletedMessages中。

broker重启后,消费者发送订阅请求,ledger和cursor恢复过程

ServerCnx:handleSubscribe

校验topic、认证

BrokerService:getTopic

如果topic存在,则从缓存中返回topic对象

如果topic不存在,且是持久化的topic,则调用loadOrCreatePersistentTopic加载或者创建一个topic对象

BrokerService:createPersistentTopic

获取该topic的ledger配置信息,并创建一个ledger

ManagedLedgerFactoryImpl:asyncOpen

如果ledger没有打开过,则创建一个新的ledger,并调用该ledger的initialize方法

ManagedLedgerImpl:initialize

从zk中获取ledger信息,初始化bk信息,初始化cursor信息。

ManagedLedgerImpl:initializeCursors

从zk中获取topic下有哪些订阅(/managed-ledgers/tenant_c/ns1/persistent/topic_cursor的children),然后依次初始化每一个sub。

针对每一个subscription创建一个cursor对象,同时对该cursor进行恢复。

从zk中获取cursor的信息,如果cursor的ledgerId为-1,则记录MarkDeletedLedgerId和MarkDeletedEntryId为订阅进度;

如果cursor的ledgerId不是-1,则开始读取bk中的进度数据,读取最新一条消息的entry,保存该entry的信息为订阅进度。

broker打印日志如下

# 权限验证
14:47:56.559 [pulsar-io-4-1] INFO auth.server.VVPulsarAuthorizationProvider - vv_auth_v2 allowTopicOperationAsync, topicName persistent://tenant_c/ns1/topic_cursor, role vv-role
# 执行lookup阶段
14:47:56.561 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - vvv_try_to_lookup,topicName: persistent://tenant_c/ns1/topic_cursor, requestId: 2
# 权限验证
14:47:56.563 [pulsar-io-4-1] INFO auth.server.VVPulsarAuthorizationProvider - vv_auth_v2 allowTopicOperationAsync, topicName persistent://tenant_c/ns1/topic_cursor, role vv-role
# 权限验证
14:47:56.644 [pulsar-io-4-1] INFO auth.server.VVPulsarAuthorizationProvider - vv_auth_v2 allowTopicOperationAsync, topicName persistent://tenant_c/ns1/topic_cursor, role vv-role
# consumer发送subscribe消息(注意的是创建ledger在subscribe之后,)
14:47:56.644 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - vvv_subscribe, consumer_002 Subscribing on topic persistent://tenant_c/ns1/topic_cursor
14:47:56.646 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.21.32.64:59676] Subscribing on topic persistent://tenant_c/ns1/topic_cursor / consumer_002
# 这里由于broker刚重启,没有加载ledger信息,所以在consumer连接后才创建ledger对象
14:47:56.640 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - vv_cc asyncOpen, name=tenant_c/ns1/persistent/topic_cursor
# 创建ledger
14:47:56.667 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger tenant_c/ns1/persistent/topic_cursor
# 从zk中获取ledger信息
14:47:56.668 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - vvv_debug /managed-ledgers/tenant_c/ns1/persistent/topic_cursor
14:47:56.722 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[116, 101, 110, 97, 110, 116, 95, 99, 47, 110, 115, 49, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 116, 111, 112, 105, 99, 95, 99, 117, 114, 115, 111, 114], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
# ledger创建成功
14:47:56.761 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Created ledger 603
# 开始加载cursor信息
14:47:56.778 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Loading cursor consumer_002
14:47:56.792 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [tenant_c/ns1/persistent/topic_cursor] Recovering from bookkeeper ledger cursor: consumer_002
# 从zk中获取之前保存的cursor信息,主要包含cursorLedgerId、markDeletedLedgerId和markDeletedEntryId
14:47:56.792 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - vv_cc asyncGetCursorInfo, ledgerName=tenant_c/ns1/persistent/topic_cursor, cursorName=consumer_002, path=/managed-ledgers/tenant_c/ns1/persistent/topic_cursor/consumer_002
# 开始恢复cursor
14:47:56.798 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [tenant_c/ns1/persistent/topic_cursor] Cursor consumer_002 recovered to position 56:9
14:47:56.800 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Recovery for cursor consumer_002 completed. pos=56:9 -- todo=0
# cursor的恢复是在创建ledger的过程中进行的,cursor恢复后则ledger创建过程结束。
14:47:56.801 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [tenant_c/ns1/persistent/topic_cursor] Successfully initialize managed ledger
14:47:56.808 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.pulsar.broker.service.AbstractTopic - Disabling publish throttling for persistent://tenant_c/ns1/topic_cursor
14:47:56.828 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://tenant_c/ns1/topic_cursor] There are no replicated subscriptions on the topic
14:47:56.837 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://tenant_c/ns1/topic_cursor - dedup is disabled
14:47:56.843 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=tenant_c/ns1/persistent/topic_cursor, name=consumer_002, ackPos=56:9, readPos=56:10} to 603:-1 since ledger consumed completely
14:47:56.847 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Ledger 56 contains the current last confirmed entry 56:9, and it is going to be deleted
14:47:56.858 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] End TrimConsumedLedgers. ledgers=1 totalSize=0
14:47:56.858 [bookkeeper-ml-scheduler-OrderedScheduler-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [tenant_c/ns1/persistent/topic_cursor] Removing ledger 56 - size: 673
14:47:57.251 [ForkJoinPool.commonPool-worker-5] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - vv_cc asyncOpenCursor, cursorName=consumer_002
14:47:57.262 [ForkJoinPool.commonPool-worker-5] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [tenant_c/ns1/persistent/topic_cursor-consumer_002] Rewind from 56:10 to 56:10
14:47:57.263 [ForkJoinPool.commonPool-worker-5] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://tenant_c/ns1/topic_cursor] There are no replicated subscriptions on the topic
14:47:57.263 [ForkJoinPool.commonPool-worker-5] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://tenant_c/ns1/topic_cursor][consumer_002] Created new subscription for 0
14:47:57.264 [ForkJoinPool.commonPool-worker-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.21.32.64:59676] Created subscription on topic persistent://tenant_c/ns1/topic_cursor / consumer_002

vim命令

文本信息配置

set nu(写入配置文件.vimrc中) 显示行号。

ctrl-g(正常模式下使用命令) 显示文件信息和当前行。

statusline.vim(插件) 增加底部状态栏。


代码缩进配置

filetype on(写入配置文件.vimrc中) 使vim对文件类型敏感。

autocmd FileType c,cpp,h :setlocal cindent cinoptions=:0,g0,t0(写入配置文件.vimrc中) 设置c缩进风格,具体详情参见:h cinoptions-values。

autocmd FileType c,cpp,h :setlocal et sta sw=4 sts=4 tabstop=4(写入配置文件.vimrc中) 设置一次缩进的距离是4个空格。

=(正常模式或者可视模式下使用命令) 按照缩进风格排版代码。

<(可视模式下使用命令) 向左缩进一级。

>(可视模式下使用命令) 向右缩进一级。


语法高亮

filetype plugin on(写入配置文件.vimrc中) 通常安装的vim中已经加入了各种语言的语法高亮插件,我们只需要打开文件类型相关的插件就可以了。


快速定位

gd(正常模式下使用命令) 跳转到局部变量定义处。

gD(正常模式下使用命令) 跳转到全局标量定义处。

*(正常模式下使用命令) 搜索并跳到下一个光标所在的单词。

g(正常模式下使用命令) 功能接近,但是查找的目标不带<和>单词分界符号。

#(正常模式下使用命令) 搜索并跳到上一个光标所在的单词。

g#(正常模式下使用命令) 功能接近#,但是查找的目标不带<和>单词分界符号。


标记位置

m{a-zA-Z0-9} (正常模式下使用命令) 在当前位置制作一个标记,标记名字可以使用a-z或者A-Z之间的任意字符,例如输入ma,就是将当前位置标记为a标记。注意a-z为单文件标记,不可跨文件使用,而A-Z0-9为全局标记,可以跨文件使用。

‘{a-zA-Z0-9} (正常模式下使用命令) 跳转到本文件的标记上。

:marks [{a-zA-z0-9}] (正常模式下使用命令) 查看指定标记的内容,不写标记号则查看所有。

:delm {a-zA-z0-9} (正常模式下使用命令) 删除一个标记。


全文搜索

:vimgrep(正常模式下使用命令) 全文搜索,功能同grep命令,但是支持在vim进行多文件跳转定位。

使用方法:vimgrep 正则表达式 文件。文件支持通配符,例如*.c代表所有的.c文件。如果希望递归搜索,可以使用**/*,表示搜索所有的文件。

:cl 列举结果

:cc(正常模式下使用命令) 当前结果

:cn(正常模式下使用命令) 下一个结果

:cp(正常模式下使用命令) 上一个结果

:cw(正常模式下使用命令) 重新打开搜索结果窗口


多文件编辑

vim f1 f2 …… fn(在shell下使用命令) 打开多个文件。

:e 文件名(正常模式下使用命令) 在vim中打开新文件。

:ls(正常模式下使用命令) 所有打开文件列表。

:bn(正常模式下使用命令) 到下一个文件。

:bp(正常模式下使用命令) 到上一个文件。

:b# 或 ctrl-6(正常模式下使用命令) 到最近的前一个文件。

set autowriteall(写入配置文件.vimrc中) 如果讨厌每次打开新文件,vim喋喋不休的要求你保存,那么可以设置自动保存。


多窗口编辑

:sp 文件名(正常模式下使用命令) 横向拆分窗口(多行窗口)。

:vsp 文件名(正常模式下使用命令) 纵向拆分窗口(多列窗口)。

ctrl-w h(正常模式下使用命令) 将光标移动到左一个窗口。

ctrl-w j(正常模式下使用命令) 将光标移动到下一个窗口。

ctrl-w k(正常模式下使用命令) 将光标移动到上一个窗口。

ctrl-w l(正常模式下使用命令) 将光标移动到右一个窗口。

ctrl-w +(正常模式下使用命令) 当前窗口尺寸变大。

ctrl-w -(正常模式下使用命令) 当前窗口尺寸变小。

ctrl-w o(正常模式下使用命令) 只显示当前窗口。


函数列表

taglist.vim(插件) 列表插件。

let Tlist_Use_Right_Window = 1(写入配置文件.vimrc中) 如果希望列表在右侧显示,则加入这个配置,默认是左侧。

:Tlist(正常模式下使用命令) 显示函数列表。

d(在taglist窗口下使用) 从列表中删除文件。

+(在taglist窗口下使用) 展开文件。

-(在taglist窗口下使用) 折叠文件。

=(在taglist窗口下使用) 折叠所有文件。

x(在taglist窗口下使用) 显示或隐藏正常窗口。


文件列表

NERD_tree.vim(插件) 横向拆分窗口(多行窗口)。

let NERDTreeWinPos = ‘right’(写入配置文件.vimrc中) 如果希望文件树在右侧显示,则加入这个配置,默认是左侧。

:NERDTree(正常模式下使用命令) 显示文件列表。


函数跳转

ctags(软件,需要另外安装) 生成多种语言tag文件的软件。

Ctags –R *.c(在shell下使用命令) 生成当前路径所有.c文件的tag,-R代表递归。

:ta 标记 或 [g] ctrl-] 列举标签(多个标签)或者跳转到标签(单个标签)。

ctrl-t 返回上一级。

:tags 列出标签栈。


语法错误

:make(正常模式下使用命令) 执行外部make命令,并且显示所有的编译警告和错误,并且可以在vim中定位。

:cl 列举结果

:cc(正常模式下使用命令) 当前结果

:cn(正常模式下使用命令) 下一个结果

:cp(正常模式下使用命令) 上一个结果

:cw(正常模式下使用命令) 重新打开搜索结果窗口


二进制文件查看 (鉴于有同学理解出现分歧 将二进制查看更新为二进制文件查看)

:范围!xxd(正常模式下使用命令) 把指定范围的部分转化为二进制文件阅读方式。

:范围!xxd -r(正常模式下使用命令) 把指定范围的部分转回字符阅读方式。


字符集的编码

let &termencoding=&encoding

set fileencodings=utf-8,gbk,cp936(写入配置文件.vimrc中)

加入多种字符编码支持。

:set fileencoding(正常模式下使用命令) 设定文件字符编码。


补充

ctrl-p(插入模式下使用命令) 跳出补全菜单。

ctrl-n(在跳出补全菜单后) 下一个结果。

ctrl-p(在跳出补全菜单后) 上一个结果。

ctrl-y(在跳出补全菜单后) 选择当前结果。

ctrl-x ctrl-f(插入模式下使用命令) 文件名补全。

ctrl-x ctrl-i(插入模式下使用命令) 包含的头文件。

ctrl-x ctrl-](插入模式下使用命令) ctags(将在下文中介绍)符号补全。

ctrl-x ctrl-o(插入模式下使用命令) omni补全,需要设置omnifunc变量。


vim其他命令 (参考vi(vim)教程)

打开文件、保存、关闭文件:

vi filename //打开filename文件

:w    //保存文件

:w vpser.net //保存至vpser.net文件

:q    //退出编辑器,如果文件已修改请使用下面的命令

:q!    //退出编辑器,且不保存

:wq //退出编辑器,且保存文件

插入文本或行:

a //在当前光标位置的右边添加文本

i //在当前光标位置的左边添加文本

A //在当前行的末尾位置添加文本

I //在当前行的开始处添加文本(非空字符的行首)

O //在当前行的上面新建一行

o //在当前行的下面新建一行

R //替换(覆盖)当前光标位置及后面的若干文本

J //合并光标所在行及下一行为一行(依然在命令模式)

移动光标:

使用上下左右方向键

命令模式下:h 向左、j 向下 、k 向上、l 向右。

空格键 向右、Backspace 向左、Enter 移动到下一行首、- 移动到上一行首。

删除:

x //删除当前字符

nx //删除从光标开始的n个字符

dd //删除当前行

ndd //向下删除当前行在内的n行

u //撤销上一步操作

U //撤销对当前行的所有操作

搜索:

/vpser //向光标下搜索vpser字符串

?vpser //向光标上搜索vpser字符串

n //向下搜索前一个搜素动作

N //向上搜索前一个搜索动作

跳转:

n+ //向下跳n行

n- //向上跳n行

nG //跳到行号为n的行

G //跳至文件的底部

设置行号:

:set nu //显示行号

:set nonu //取消显示行号

复制:

yy //将当前行复制到缓存区,也可以用 “ayy 复制,”a 为缓冲区,a也可以替换为a到z的任意字母,可以完成多个复制任务。

nyy //将当前行向下n行复制到缓冲区,也可以用 “anyy 复制,”a 为缓冲区,a也可以替换为a到z的任意字母,可以完成多个复制任务。

yw //复制从光标开始到词尾的字符。

nyw //复制从光标开始的n个单词。

y^ //复制从光标到行首的内容。

y$ //复制从光标到行尾的内容。

p //粘贴剪切板里的内容在光标后,如果使用了前面的自定义缓冲区,建议使用”ap 进行粘贴。

P //粘贴剪切板里的内容在光标前,如果使用了前面的自定义缓冲区,建议使用”aP 进行粘贴。

替换:

:s/old/new //用new替换行中首次出现的old

:s/old/new/g //用new替换行中所有的old

:n,m s/old/new/g //用new替换从n到m行里所有的old

:%s/old/new/g //用new替换当前文件里所有的old

编辑其他资源:

:e otherfilename //编辑文件名为otherfilename的文件。

修改文件格式:

:set fileformat=unix //将文件修改为unix格式,如win下面的文本文件在linux下会出现^M。