@Override public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException { if (conf.getReceiverQueueSize() == 0) { throw new PulsarClientException.InvalidConfigurationException( "Can't use receive with timeout, if the queue size is 0"); } if (listener != null) { throw new PulsarClientException.InvalidConfigurationException( "Cannot use receive() when a listener has been set"); } verifyConsumerState(); return internalReceive(timeout, unit); }
ConsumerImpl
@Override protected Message<T> internalReceive(int timeout, TimeUnit unit)throws PulsarClientException { Message<T> message; try { // 先从缓存队列中获取,如果获取不到则直接返回。 message = incomingMessages.poll(timeout, unit); if (message == null) { returnnull; } // 暂时不返回消息,而是给broker发送通知,表示自己可以接收更多的消息了。 messageProcessed(message); return beforeConsume(message); } catch (InterruptedException e) { Statestate= getState(); if (state != State.Closing && state != State.Closed) { stats.incrementNumReceiveFailed(); throw PulsarClientException.unwrap(e); } else { returnnull; } } } @Override protectedsynchronizedvoidmessageProcessed(Message<?> msg) { ClientCnxcurrentCnx= cnx(); ClientCnxmsgCnx= ((MessageImpl<?>) msg).getCnx(); lastDequeuedMessageId = msg.getMessageId(); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. } else { increaseAvailablePermits(currentCnx); stats.updateNumMsgsReceived(msg); trackMessage(msg); } decreaseIncomingMessageSize(msg); } voidincreaseAvailablePermits(ClientCnx currentCnx) { increaseAvailablePermits(currentCnx, 1); } protectedvoidincreaseAvailablePermits(ClientCnx currentCnx, int delta) { intavailable= AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta); while (available >= receiverQueueRefillThreshold && !paused) { // 确保能发送一次通知 if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) { sendFlowPermitsToBroker(currentCnx, available); break; } else { available = AVAILABLE_PERMITS_UPDATER.get(this); } } } /** * send the flow command to have the broker start pushing messages * 发送flow通知给broker,异步获取更多消息。 */ privatevoidsendFlowPermitsToBroker(ClientCnx cnx, int numMessages) { if (cnx != null && numMessages > 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Adding {} additional permits", topic, subscription, numMessages); } if (log.isDebugEnabled()) { cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages)) .addListener(writeFuture -> { if (!writeFuture.isSuccess()) { log.debug("Consumer {} failed to send {} permits to broker: {}", consumerId, numMessages, writeFuture.cause().getMessage()); } else { log.debug("Consumer {} sent {} permits to broker", consumerId, numMessages); } }); } else { cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise()); } } }
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ syntax = "proto2"; option java_package = "org.apache.bookkeeper.mledger.proto"; option optimize_for = SPEED; message KeyValue { required string key = 1; required string value = 2; } message OffloadDriverMetadata { required string name = 1; repeated KeyValue properties = 2; } message OffloadContext { optional int64 uidMsb = 1; optional int64 uidLsb = 2; optional bool complete = 3; optional bool bookkeeperDeleted = 4; optional int64 timestamp = 5; optional OffloadDriverMetadata driverMetadata = 6; repeated OffloadSegment offloadSegment = 7; } message OffloadSegment { optional int64 uidMsb = 1; optional int64 uidLsb = 2; optional bool complete = 3; optional int64 assignedTimestamp = 4; //timestamp in millisecond optional int64 offloadedTimestamp = 5; //timestamp in millisecond optional int64 endEntryId = 6; optional OffloadDriverMetadata driverMetadata = 7; } message ManagedLedgerInfo { message LedgerInfo { required int64 ledgerId = 1; optional int64 entries = 2; optional int64 size = 3; optional int64 timestamp = 4; optional OffloadContext offloadContext = 5; } repeated LedgerInfo ledgerInfo = 1; // If present, it signals the managed ledger has been // terminated and this was the position of the last // committed entry. // No more entries can be written. optional NestedPositionInfo terminatedPosition = 2; repeated KeyValue properties = 3; } message PositionInfo { required int64 ledgerId = 1; required int64 entryId = 2; repeated MessageRange individualDeletedMessages = 3; // Additional custom properties associated with // the current cursor position repeated LongProperty properties = 4; // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5; } message NestedPositionInfo { required int64 ledgerId = 1; required int64 entryId = 2; } message MessageRange { required NestedPositionInfo lowerEndpoint = 1; required NestedPositionInfo upperEndpoint = 2; } message BatchedEntryDeletionIndexInfo { required NestedPositionInfo position = 1; repeated int64 deleteSet = 2; } // Generic string and long tuple message LongProperty { required string name = 1; required int64 value = 2; } message ManagedCursorInfo { // If the ledger id is -1, then the mark-delete position is // the one from the (ledgerId, entryId) snapshot below required int64 cursorsLedgerId = 1; // Last snapshot of the mark-delete position optional int64 markDeleteLedgerId = 2; optional int64 markDeleteEntryId = 3; repeated MessageRange individualDeletedMessages = 4; // Additional custom properties associated with // the current cursor position repeated LongProperty properties = 5; optional int64 lastActive = 6; // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; }
2.3 判断topic是属于哪一个broker
通过topic查找bundle,然后判断bundle是否属于broker。
OwnershipCache中的ownershipReadOnlyCache:
private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> resolveOwnership(String path) { // 通过缓存的zk信息,找到该path(topic全路径)对应的的bundle信息 return ownershipReadOnlyCache.getWithStatAsync(path).thenApply(optionalOwnerDataWithStat -> { if (optionalOwnerDataWithStat.isPresent()) { Map.Entry<NamespaceEphemeralData, Stat> ownerDataWithStat = optionalOwnerDataWithStat.get(); Stat stat = ownerDataWithStat.getValue(); if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) { LOG.info("Successfully reestablish ownership of {}", path); OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitUtils.suBundleFromPath(path, bundleFactory)); if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) { ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle)); } ownershipReadOnlyCache.invalidate(path); } } return optionalOwnerDataWithStat; }); }