在介绍消息的生命周期之前,先看一下 pulsar 的架构
一、pulsar 集群
1. Brokers + Bookies
broker 是各个组件之间进行交互的对象。pulsar 是分层架构模式,使用 Bookkeeper 作为额外的存储系统, bookies 就是 Bookkeeper 里的存储节点。
Brokers + Bookies 构成 pulsar 的两个层次, 共同完成 pulsar 的数据服务
Broker: 整个消息层的生产和消费,无存储状态
Bookie: 数据持久化保存的节点,有存储状态
2. Zookeeper
Zookeeper 在 Pulsar 里的作用是存储 Pulsar 系统里的元数据和集群的管理以及节点的发现等,节点发现就是发现集群中有哪些 broker 哪些 bookie 。
以上的三个组件,构成了 Pulsar 的集群。Brokers + bookies 为数据服务,Zookeeper 为元数据服务。
二、message lifecycle
1. producer 生产消息(写过程)
producer 通过生产消息到一个 topic, 一个 topic 中可能有 N 个 partition, 每个 partition 给一个 broker 服务。
生产者内部有一个 MessageWriter 的类, 这个 MessageWriter 默认是 round-robin 的过程,就是发送每条消息的时候回去轮训找到一个 partition 进行发送,但为了提高效率,在一段时间内只会选择一个 partition 进行发送。如果 message 指定了 key,那么会根据 key 的hash去找到对应的 partition 进行发送。
之后 Broker 收到消息后会调用 BookKeeper 的客户端并发去写多个 bookie 副本。当 broker 收到一定数量的 ACK 后,他会认为消息已经写入成功,broker 返回客户端,告知这条消息已经被持久化完成。
整个写的过程消息是从 producer 到 broker,broker 到 Bookkeeper 上。 整个过程中客户端都不会跟Zookeeper打交道,也不会和 Bookkeeper打交道, 只和 Broker 打交道。
2. consumer 消费消息(读过程)
2.1 broker 有缓存的情况下
Broker 可能已经缓存了部分信息,consumer 在连接到 broker 后建立长链接, broker 把消息从内存里拿出来通过推得方式 dispatch 给 consumer, consumer 收到消息后会放到消费端的 receiver queue 中,consumer 就可以消费了,完了发送确认ack给broker。
2.2 broker 没有缓存的情况下
broker 没有缓存这部分数据,需要去 bookie 去读取数据, 数据读取出来后再 dispatch 给 consumer,读取是选择任意一个存储节点读取的,整个存储架构没有主节点的说法。
3. Data Retention
3.1 Subscription Initial Position
之前整理了 subscription 和 cursor 的概念。如果有个新的订阅是pulsar中没有的,那么如何创建 cursor?
这里就有两个概念
earliest:放到整个流中第一条有效数据
latest: 放到整个流中最后一条有效数据 (默认的)
Cursor 放置的位置,决定了最终消费了什么数据
3.2 Message Retention
消费位置最早的订阅决定了你能保留消息多久,订阅之前的消息可以被删除。这是 pulsar 的默认行为, 即消费完就可以被删除, 释放空间留给之后的消息使用。
由于流计算的需求,有些数据消费完还不能删除,需要再额外保留个三五天。就需要 retention 来进行数据的保留设置
添加后 retention 后, 紫色部分的内容就是该保留的数据,可以配置多大内存/多少天。但前提是被所有订阅消费完了
3.3 TTL
还有一种情况就是,生产者一直在生产消息,但是消费者一直没有处理,那么消息永远不会被确认,那么这个消息会被一直累积。
为了保障消息不会被一只累加下去,可以在这写橘色消息部分加上TTL,TTL作用范围是没被确认的消息。
在TTL之后,消息会到Retention中,然后再经理过Retention中设置的时间,进行最后的数据清除。
3.4 Message Deletion
正常理解是,消息过期后就会被删除,但是在pulsar中,消息的删除是按照 片(segment) 进行的。
例如上图,删除的是 S1 这个分片,而S2 不会被删除,因为只有分片中所有的消息是待删除状态,才会去删除这个分片。
3.5 storage size
storage size是计算所有没被删除的 segment 所占用的存储空间。
整个存储空间是按照 segment 之间的存储力度进行计算的,同时 garbage collector 是定时执行的,所有有时候可以发现 segement 已经被清空了,但是 storage size 仍然没有变化。