pulsar失败重试(negativeAckRedeliveryDelay)的解释

在pulsar讨论群里看到关于这个api的问题,然后就去看了代码,了解了其功能和详细处理逻辑,在此记录下。

1. 消息发布订阅流程

image-20231008221757348

如上图所示,消费者收到消息并返回确认后,broker端就认为这条消息被成功转发了。但是consumer端接收到消息后可能会处理失败,导致没有发送确认给broker。broker端有一个未确认消息队列,这个队列达到一定大小后就会阻塞,这时broker就不会继续发送消息给consumer,也不会重复发送以前未确认的消息给consumer。如果consumer和broker断开连接了,未确认的消息就又会重新发送给consumer了。

这时候有人看到negativeAckRedeliveryDelay,猜想是不是可以设置重复消费消息,即当一定时间内没有确认后能再次接收到该消息。然后我看过代码后发现确实是能实现的,实现方式是handler回复消费失败后,把失败的消息发送给broker,broker再发送给consumer。

2. 关于消息重复发送的解释

image-20231008221819464

broker发送Msg01给consumer1,consumer1在handle中处理失败,调用negativeAcknowledge方法,发送处理失败的消息信息给broker,broker收到后再选择发送给consumer1或者consumer2.

其中两个重要的API:

ConsumerBuilder,用于设置多长时间后再次收到处理失败的消息。

ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
/**
* Set the delay to wait before re-delivering messages that have failed to be process.
*
* <p>When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
* will be redelivered after a fixed timeout. The default is 1 min.
*
* @param redeliveryDelay
* redelivery delay for failed messages
* @param timeUnit
* unit in which the timeout is provided.
* @return the consumer builder instance
* @see Consumer#negativeAcknowledge(Message)
*/
ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);

Consumer,用于通知broker自己处理消息失败了。

/**
* Acknowledge the failure to process a single message.
*
* <p>When a message is "negatively acked" it will be marked for redelivery after
* some fixed delay. The delay is configurable when constructing the consumer
* with {@link ConsumerBuilder#negativeAckRedeliveryDelay(long, TimeUnit)}.
*
* <p>This call is not blocking.
*
* <p>Example of usage:
* <pre><code>
* while (true) {
* Message&lt;String&gt; msg = consumer.receive();
*
* try {
* // Process message...
*
* consumer.acknowledge(msg);
* } catch (Throwable t) {
* log.warn("Failed to process message");
* consumer.negativeAcknowledge(msg);
* }
* }
* </code></pre>
*
* @param message
* The {@code Message} to be acknowledged
*/
void negativeAcknowledge(Message<?> message);

3. 测试

测试的思路是:

  1. 编写两个客户端,同时订阅一份数据。
  2. 在第一个客户端里返回无效确认,第二个客户端里返回有效确认。
  3. 确认第一个客户端返回取消确认的消息是否能被第二个客户端收到。

测试代码:

package ack;
import auth.client.VVAuthentication;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @function
* @date 2021/8/4 16:23
*/
public class RedeliverTest {
private static final Logger log = LoggerFactory.getLogger(RedeliverTest.class);
public static void main(String[] args) throws Exception {
RedeliverTest main = new RedeliverTest();
main.run();
}
String topic = "persistent://tenant_vv/ns1/redeliver_test";
void run() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> consumer(true, latch)).start();

// 确保第一个消费者先收到消息
latch.await();
new Thread(() -> consumer(false, latch)).start();
produce();
log.info("done");
}
void produce() throws Exception {
try (PulsarClient client = newClient()) {
Producer p = client.newProducer()
.topic(topic)
.create();
for (int i=0; i<2; i++) {
p.newMessage(Schema.STRING).value("data_" + i).key("sub mode "+ i).sendAsync();
}
p.flush();
p.close();
} catch (Exception e) {
log.error("", e);
}
}
void consumer(boolean replay, CountDownLatch latch) {
try (PulsarClient client = newClient()) {
String tag = "replay_mode_" + replay;
Consumer c = client.newConsumer(Schema.STRING)
.topic(topic)
// 设置消息处理失败后,2秒后再此收到消息。
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub_name")
.subscribe();
latch.countDown();
while (true) {
Message<String> msg = c.receive(10, TimeUnit.SECONDS);
if (msg == null) {
break;
}
if (replay) {
c.negativeAcknowledge(msg);
} else {
c.acknowledge(msg);
}
log.info(tag + " receive " + msg.getValue());
}
c.close();
} catch (Exception e) {
log.error("", e);
}
}
PulsarClient newClient() throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://x.x.x.x:6650")
.authentication(new VVAuthentication())
.build();
return client;
}
}

测试结果:

16:52:56.639 [Thread-1] INFO  ack.RedeliverTest - replay_mode_true receive data_0
16:52:56.639 [Thread-1] INFO ack.RedeliverTest - replay_mode_true receive data_1
16:52:58.660 [Thread-3] INFO ack.RedeliverTest - replay_mode_false receive data_0
16:52:58.660 [Thread-3] INFO ack.RedeliverTest - replay_mode_false receive data_1

可以看到,第一个客户端收到消息并调用negativeAcknowledge返回无效确认后,第二个客户端在2秒后能收到消息了,并且消息是有序的。

4. 涉及到的主要代码

主要逻辑是,consumer处理失败后,把失败的消息通知给broker,broker把失败消息缓存起来,标记为需要重新发送的消息,然后每次发送消息给消费者的时候检查是否有需要重发的消息,如果有的话就发送。

4.1 ConsumerImpl

保存消息并等待发送

@Override
public void negativeAcknowledge(MessageId messageId) {
negativeAcksTracker.add(messageId);
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(messageId);
}

使用接收消息的netty channel发送失败消息通知给broker

@Override
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
if (messageIds.isEmpty()) {
return;
}
checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
if (conf.getSubscriptionType() != SubscriptionType.Shared
&& conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
// We cannot redeliver single messages if subscription type is not Shared
redeliverUnacknowledgedMessages();
return;
}
ClientCnx cnx = cnx();
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) {
int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
Iterable<List<MessageIdImpl>> batches = Iterables.partition(
messageIds.stream()
.map(messageId -> (MessageIdImpl)messageId)
.collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED);
batches.forEach(ids -> {
getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
if (!messageIdData.isEmpty()) {
ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
}
});
});
if (messagesFromQueue > 0) {
increaseAvailablePermits(cnx, messagesFromQueue);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", subscription, topic,
consumerName, messagesFromQueue);
}
return;
}
if (cnx == null || (getState() == State.Connecting)) {
log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this);
} else {
log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
cnx.ctx().close();
}
}

4.2 NegativeAcksTraker

添加处理失败的消息到缓存中

public synchronized void add(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
}
if (nackedMessages == null) {
nackedMessages = new HashMap<>();
}
nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos);
if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
}

通过触发器发送失败消息到broker


private synchronized void triggerRedelivery(Timeout t) {
if (nackedMessages.isEmpty()) {
this.timeout = null;
return;
}
// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
nackedMessages.forEach((msgId, timestamp) -> {
if (timestamp < now) {
addChunkedMessageIdsAndRemoveFromSequnceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});
messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
// 使用netty发送消息给broker
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}

4.3 ServerCnx

处理接收到的需要重新发送的消息

@Override
protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) {
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("[{}] Received Resend Command from consumer {} ", remoteAddress, redeliver.getConsumerId());
}
CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
consumer.redeliverUnacknowledgedMessages();
}
}
}

4.4 Consumer

public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
int totalRedeliveryMessages = 0;
List<PositionImpl> pendingPositions = Lists.newArrayList();
for (MessageIdData msg : messageIds) {
PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId());
LongPair batchSize = pendingAcks.get(position.getLedgerId(), position.getEntryId());
if (batchSize != null) {
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
totalRedeliveryMessages += batchSize.first;
pendingPositions.add(position);
}
}
addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", topicName, subscription, consumerId,
totalRedeliveryMessages, pendingPositions.size());
}
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}",
topicName, subscription, numberOfBlockedPermits, consumerId);
}
subscription.consumerFlow(this, numberOfBlockedPermits);
}
}

4.5 PersistentDispatcherMultipleConsumers

保存消息到缓存中

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
positions.forEach(position -> {
if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
redeliveryTracker.addIfAbsent(position);
}
});
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
}
readMoreEntries();
}

5. 总结

这里只是总结下消息重发的处理逻辑,具体broker和consumer之间是怎么通信的还需要继续研究…

Author: iMine
Link: https://imine141.github.io/2021/08/04/pulsar/pulsar%E5%A4%B1%E8%B4%A5%E9%87%8D%E8%AF%95%EF%BC%88negativeAckRedeliveryDelay%EF%BC%89%E7%9A%84%E8%A7%A3%E9%87%8A/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.