读取和解析pulsar保存在bookkeeper上的消息

pulsar使用bk对消息进行持久化,为了实现只读broker,需要跨过pulsar直接连接bk读取消息。在bk中ledger是一个日志段,bk以ledger为粒度进行数据的多地备份,ledger包含多个entry,每个entry对应pulsar中的一条或者多条消息。

而pulsar每个topic在bk中的存储信息保存在zk中。即我们可以通过zk中获取特定topic的ledger信息,然后连接到bk,读取ledger下的每个entry的消息。具体代码如下:

package tmp;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @function
* @date 2021/8/10 16:10
*/
public class BKMain {
private static final Logger log = LoggerFactory.getLogger(BKMain.class);
public static void main(String[] args) throws Exception {
BKMain main = new BKMain();
main.run();
}
void run() throws Exception {
load01();
}
void load01() throws Exception {
// zookeeper地址
// bk的信息保存在zk的/ledger路径
String connectionString = "x.x.x.x:2181";
BookKeeper bkClient = new BookKeeper(connectionString);
// 通过zk上/managed-ledgers/tenant_c/ns1/persistent/storeV2获取topic storeV2的ledgerId
long ledgerId = 56990;
// 设置加密方式和密码(同pulsar一样)
ReadHandle handle = bkClient.newOpenLedgerOp().withRecovery(false).withLedgerId(ledgerId)
.withDigestType(DigestType.CRC32C).withPassword("".getBytes(Charsets.UTF_8)).execute().get();
// 获取上一次被添加确认的entryId。则这个entryId之前的数据对外是可见的
long id = handle.readLastAddConfirmed();
log.info("id=" + id);
long firstEntry = 0;
long lastEntry = id;
LedgerEntries entries = handle.read(firstEntry, lastEntry);
if (entries != null) {
StringBuilder sb = new StringBuilder("\n");
for (LedgerEntry entry : entries) {
ByteBuf buf = entry.getEntryBuffer();
// 解析每一条消息(因为我们本身没有采用批量的方式发送消息,没有压缩,没有加密,而且指定的schema也是string,所以解析部分也很简单)
Commands.skipBrokerEntryMetadataIfExist(buf);
Commands.skipChecksumIfPresent(buf);
sb.setLength(0);
sb.append("ledgerId=").append(entry.getLedgerId());
sb.append(", entryId=").append(entry.getEntryId());
sb.append(", dataLen=").append(entry.getLength() + buf.readableBytes());
MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf);
sb.append(", numMessages=").append(msgMetadata.getNumMessagesInBatch());
sb.append(", publishTime=").append(msgMetadata.getPublishTime());
sb.append(", sequenceId=").append(msgMetadata.getSequenceId());
// 如果有些字段不是必须的,则需要先判断是否有该字段,否则解析会报错。
sb.append(", numChunks=").append(msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0);
sb.append(", replicateFrom=").append(msgMetadata.hasReplicatedFrom() ? msgMetadata.getReplicatedFrom() : "none");
byte[] dataBytes = new byte[buf.readableBytes()];
buf.readBytes(dataBytes);
sb.append(", data=").append(new String(dataBytes));
log.info(sb.toString());
}
entries.close();
}
handle.close();
bkClient.close();
}
}

代码执行结果:

ledgerId=56990, entryId=0, dataLen=90, numMessages=1, publishTime=1628583539500, sequenceId=0, numChunks=0, replicateFrom=none, data=value 0
ledgerId=56990, entryId=1, dataLen=138, numMessages=1, publishTime=1628583539504, sequenceId=1, numChunks=0, replicateFrom=none, data=value 1
ledgerId=56990, entryId=2, dataLen=186, numMessages=1, publishTime=1628583539504, sequenceId=2, numChunks=0, replicateFrom=none, data=value 2
Author: iMine
Link: https://imine141.github.io/2021/08/10/pulsar/%E8%AF%BB%E5%8F%96%E5%92%8C%E8%A7%A3%E6%9E%90pulsar%E4%BF%9D%E5%AD%98%E5%9C%A8bookkeeper%E4%B8%8A%E7%9A%84%E6%B6%88%E6%81%AF/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.