pulsar messager lifecycle

在介绍消息的生命周期之前,先看一下 pulsar 的架构

一、pulsar 集群

1. Brokers + Bookies

broker 是各个组件之间进行交互的对象。pulsar 是分层架构模式,使用 Bookkeeper 作为额外的存储系统, bookies 就是 Bookkeeper 里的存储节点。

Brokers + Bookies 构成 pulsar 的两个层次, 共同完成 pulsar 的数据服务

Broker: 整个消息层的生产和消费,无存储状态

Bookie: 数据持久化保存的节点,有存储状态

2. Zookeeper

Zookeeper 在 Pulsar 里的作用是存储 Pulsar 系统里的元数据和集群的管理以及节点的发现等,节点发现就是发现集群中有哪些 broker 哪些 bookie 。

以上的三个组件,构成了 Pulsar 的集群。Brokers + bookies 为数据服务,Zookeeper 为元数据服务。

image-20210611184242420

二、message lifecycle

1. producer 生产消息(写过程)

producer 通过生产消息到一个 topic, 一个 topic 中可能有 N 个 partition, 每个 partition 给一个 broker 服务。

生产者内部有一个 MessageWriter 的类, 这个 MessageWriter 默认是 round-robin 的过程,就是发送每条消息的时候回去轮训找到一个 partition 进行发送,但为了提高效率,在一段时间内只会选择一个 partition 进行发送。如果 message 指定了 key,那么会根据 key 的hash去找到对应的 partition 进行发送。

之后 Broker 收到消息后会调用 BookKeeper 的客户端并发去写多个 bookie 副本。当 broker 收到一定数量的 ACK 后,他会认为消息已经写入成功,broker 返回客户端,告知这条消息已经被持久化完成。

整个写的过程消息是从 producer 到 broker,broker 到 Bookkeeper 上。 整个过程中客户端都不会跟Zookeeper打交道,也不会和 Bookkeeper打交道, 只和 Broker 打交道。

2. consumer 消费消息(读过程)
2.1 broker 有缓存的情况下

Broker 可能已经缓存了部分信息,consumer 在连接到 broker 后建立长链接, broker 把消息从内存里拿出来通过推得方式 dispatch 给 consumer, consumer 收到消息后会放到消费端的 receiver queue 中,consumer 就可以消费了,完了发送确认ack给broker。

2.2 broker 没有缓存的情况下

broker 没有缓存这部分数据,需要去 bookie 去读取数据, 数据读取出来后再 dispatch 给 consumer,读取是选择任意一个存储节点读取的,整个存储架构没有主节点的说法。

3. Data Retention
3.1 Subscription Initial Position

之前整理了 subscription 和 cursor 的概念。如果有个新的订阅是pulsar中没有的,那么如何创建 cursor?

这里就有两个概念

earliest:放到整个流中第一条有效数据

latest: 放到整个流中最后一条有效数据 (默认的)

Cursor 放置的位置,决定了最终消费了什么数据

image-20210611195453885

3.2 Message Retention

消费位置最早的订阅决定了你能保留消息多久,订阅之前的消息可以被删除。这是 pulsar 的默认行为, 即消费完就可以被删除, 释放空间留给之后的消息使用。

由于流计算的需求,有些数据消费完还不能删除,需要再额外保留个三五天。就需要 retention 来进行数据的保留设置

image-20210611200055467

添加后 retention 后, 紫色部分的内容就是该保留的数据,可以配置多大内存/多少天。但前提是被所有订阅消费完了

3.3 TTL

还有一种情况就是,生产者一直在生产消息,但是消费者一直没有处理,那么消息永远不会被确认,那么这个消息会被一直累积。

为了保障消息不会被一只累加下去,可以在这写橘色消息部分加上TTL,TTL作用范围是没被确认的消息。

image-20210611200532518

在TTL之后,消息会到Retention中,然后再经理过Retention中设置的时间,进行最后的数据清除。

3.4 Message Deletion

正常理解是,消息过期后就会被删除,但是在pulsar中,消息的删除是按照 片(segment) 进行的。

image-20210611200921982

例如上图,删除的是 S1 这个分片,而S2 不会被删除,因为只有分片中所有的消息是待删除状态,才会去删除这个分片。

3.5 storage size

storage size是计算所有没被删除的 segment 所占用的存储空间。

整个存储空间是按照 segment 之间的存储力度进行计算的,同时 garbage collector 是定时执行的,所有有时候可以发现 segement 已经被清空了,但是 storage size 仍然没有变化。

Author: iMine
Link: https://imine141.github.io/2021/06/11/pulsar/pulsar%20messager%20lifecicle/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.