pulsar使用bk对消息进行持久化,为了实现只读broker,需要跨过pulsar直接连接bk读取消息。在bk中ledger是一个日志段,bk以ledger为粒度进行数据的多地备份,ledger包含多个entry,每个entry对应pulsar中的一条或者多条消息。
而pulsar每个topic在bk中的存储信息保存在zk中。即我们可以通过zk中获取特定topic的ledger信息,然后连接到bk,读取ledger下的每个entry的消息。具体代码如下:
package tmp; import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class BKMain { private static final Logger log = LoggerFactory.getLogger(BKMain.class); public static void main(String[] args) throws Exception { BKMain main = new BKMain(); main.run(); } void run() throws Exception { load01(); } void load01() throws Exception { String connectionString = "x.x.x.x:2181"; BookKeeper bkClient = new BookKeeper(connectionString); long ledgerId = 56990; ReadHandle handle = bkClient.newOpenLedgerOp().withRecovery(false).withLedgerId(ledgerId) .withDigestType(DigestType.CRC32C).withPassword("".getBytes(Charsets.UTF_8)).execute().get(); long id = handle.readLastAddConfirmed(); log.info("id=" + id); long firstEntry = 0; long lastEntry = id; LedgerEntries entries = handle.read(firstEntry, lastEntry); if (entries != null) { StringBuilder sb = new StringBuilder("\n"); for (LedgerEntry entry : entries) { ByteBuf buf = entry.getEntryBuffer(); Commands.skipBrokerEntryMetadataIfExist(buf); Commands.skipChecksumIfPresent(buf); sb.setLength(0); sb.append("ledgerId=").append(entry.getLedgerId()); sb.append(", entryId=").append(entry.getEntryId()); sb.append(", dataLen=").append(entry.getLength() + buf.readableBytes()); MessageMetadata msgMetadata = Commands.parseMessageMetadata(buf); sb.append(", numMessages=").append(msgMetadata.getNumMessagesInBatch()); sb.append(", publishTime=").append(msgMetadata.getPublishTime()); sb.append(", sequenceId=").append(msgMetadata.getSequenceId()); sb.append(", numChunks=").append(msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0); sb.append(", replicateFrom=").append(msgMetadata.hasReplicatedFrom() ? msgMetadata.getReplicatedFrom() : "none"); byte[] dataBytes = new byte[buf.readableBytes()]; buf.readBytes(dataBytes); sb.append(", data=").append(new String(dataBytes)); log.info(sb.toString()); } entries.close(); } handle.close(); bkClient.close(); } }
|
代码执行结果:
ledgerId=56990, entryId=0, dataLen=90, numMessages=1, publishTime=1628583539500, sequenceId=0, numChunks=0, replicateFrom=none, data=value 0 ledgerId=56990, entryId=1, dataLen=138, numMessages=1, publishTime=1628583539504, sequenceId=1, numChunks=0, replicateFrom=none, data=value 1 ledgerId=56990, entryId=2, dataLen=186, numMessages=1, publishTime=1628583539504, sequenceId=2, numChunks=0, replicateFrom=none, data=value 2
|