ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit); /** * Set the delay to wait before re-delivering messages that have failed to be process. * * <p>When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message * will be redelivered after a fixed timeout. The default is 1 min. * * @param redeliveryDelay * redelivery delay for failed messages * @param timeUnit * unit in which the timeout is provided. * @return the consumer builder instance * @see Consumer#negativeAcknowledge(Message) */ ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);
Consumer,用于通知broker自己处理消息失败了。
/** * Acknowledge the failure to process a single message. * * <p>When a message is "negatively acked" it will be marked for redelivery after * some fixed delay. The delay is configurable when constructing the consumer * with {@link ConsumerBuilder#negativeAckRedeliveryDelay(long, TimeUnit)}. * * <p>This call is not blocking. * * <p>Example of usage: * <pre><code> * while (true) { * Message<String> msg = consumer.receive(); * * try { * // Process message... * * consumer.acknowledge(msg); * } catch (Throwable t) { * log.warn("Failed to process message"); * consumer.negativeAcknowledge(msg); * } * } * </code></pre> * * @param message * The {@code Message} to be acknowledged */ void negativeAcknowledge(Message<?> message);
@Override publicvoidnegativeAcknowledge(MessageId messageId) { negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(messageId); }
使用接收消息的netty channel发送失败消息通知给broker
@Override publicvoidredeliverUnacknowledgedMessages(Set<MessageId> messageIds) { if (messageIds.isEmpty()) { return; } checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl); if (conf.getSubscriptionType() != SubscriptionType.Shared && conf.getSubscriptionType() != SubscriptionType.Key_Shared) { // We cannot redeliver single messages if subscription type is not Shared redeliverUnacknowledgedMessages(); return; } ClientCnxcnx= cnx(); if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) { intmessagesFromQueue= removeExpiredMessagesFromQueue(messageIds); Iterable<List<MessageIdImpl>> batches = Iterables.partition( messageIds.stream() .map(messageId -> (MessageIdImpl)messageId) .collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED); batches.forEach(ids -> { getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> { if (!messageIdData.isEmpty()) { ByteBufcmd= Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData); cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); } }); }); if (messagesFromQueue > 0) { increaseAvailablePermits(cnx, messagesFromQueue); } if (log.isDebugEnabled()) { log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", subscription, topic, consumerName, messagesFromQueue); } return; } if (cnx == null || (getState() == State.Connecting)) { log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this); } else { log.warn("[{}] Reconnecting the client to redeliver the messages.", this); cnx.ctx().close(); } }
4.2 NegativeAcksTraker
添加处理失败的消息到缓存中
publicsynchronizedvoidadd(MessageId messageId) { if (messageId instanceof BatchMessageIdImpl) { BatchMessageIdImplbatchMessageId= (BatchMessageIdImpl) messageId; messageId = newMessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()); } if (nackedMessages == null) { nackedMessages = newHashMap<>(); } nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos); if (this.timeout == null) { // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for // nack immediately following the current one will be batched into the same redeliver request. this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } }
通过触发器发送失败消息到broker
privatesynchronizedvoidtriggerRedelivery(Timeout t) { if (nackedMessages.isEmpty()) { this.timeout = null; return; } // Group all the nacked messages into one single re-delivery request Set<MessageId> messagesToRedeliver = newHashSet<>(); longnow= System.nanoTime(); nackedMessages.forEach((msgId, timestamp) -> { if (timestamp < now) { addChunkedMessageIdsAndRemoveFromSequnceMap(msgId, messagesToRedeliver, this.consumer); messagesToRedeliver.add(msgId); } }); messagesToRedeliver.forEach(nackedMessages::remove); consumer.onNegativeAcksSend(messagesToRedeliver); // 使用netty发送消息给broker consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); }