记一次CompletableFuture引起的死锁问题

1. 背景

在pulsar中添加只读broker功能的支持,所以修改了pulsar-broker的代码,在创建cursor时添加了zk查询代码。

然后测试的时候发现,每一次重新创建topic,consumer开始订阅消息后,broker就会出现阻塞,导致subscribe阶段一直不成功。

2. 问题分析

consumer和pulsar之间进行交互的阶段有:

  • CONNECT

  • PARTITIONED_METADATA

  • LOOKUP

  • SUBSCRIBE

然后当consumer发送SUBSCRIBE后,一直收不到broker的应答,由此确认broker处理SUBSCRIBE出现问题。

2.1 定位代码

通过查看pulsar的源码,可以看到对于zk数据的查询都是异步执行的,而且共用的一个线程,如下:

// AbstractMetadataStore 构造方法
protected AbstractMetadataStore() {
this.executor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("metadata-store"));
registerListener(this);
// other code
}

// 执行查询或者回调函数
/**
* Run the task in the executor thread and fail the future if the executor is shutting down
*/
protected void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {
future.completeExceptionally(t);
}
}

所以猜测是由于回调函数没有执行完毕导致的阻塞,因此在所有的回调函数出打印日志,定位到如下代码出现阻塞:

@Override
public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();

CompletableFuture<Stat> future = new CompletableFuture<>();

try {
if (hasVersion && expectedVersion == -1) {
CreateMode createMode = getCreateMode(options);
ZkUtils.asyncCreateFullPathOptimistic(zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
createMode, (rc, path1, ctx, name) -> {
execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
// 运行到此处时出现阻塞。
future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true));
} else if (code == Code.NODEEXISTS) {
// We're emulating a request to create node, so the version is invalid
future.completeExceptionally(getException(Code.BADVERSION, path));
} else {
future.completeExceptionally(getException(code, path));
}
}, future);
}, null);
} else {
// other code...
}
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
}

return future;
}

2.2 查看进程和cpu使用情况

然后查询进程和cpu使用情况:

# 找到进程ID

ps -ef |grep broker

# 查询进程cpu

top -Hp 16704

image-20240519001339974

看到cpu使用的并不多,再10%以下,由此可推断不是gc或者其他死循环造成的阻塞。

2.3 查看堆栈信息

"metadata-store-7-1" #84 prio=5 os_prio=0 tid=0x00007f1560004800 nid=0x1ca1 waiting on condition [0x00007f1506a2c000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000fd0c3d18> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.recoverCursorFromZK(ManagedLedgerImpl.java:1072)
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:1019)
// 阻塞原因
- locked <0x00000000f5841ce8> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:857)
at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:747)
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:1050)
at org.apache.pulsar.broker.service.ServerCnx$$Lambda$277/1382611823.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$$Lambda$295/640790180.accept(Unknown Source)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
// 阻塞代码
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$null$15(ZKMetadataStore.java:234)
at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$127/153703270.run(Unknown Source)
at org.apache.pulsar.metadata.impl.AbstractMetadataStore.lambda$execute$8(AbstractMetadataStore.java:261)
at org.apache.pulsar.metadata.impl.AbstractMetadataStore$$Lambda$128/1782705605.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

"main-EventThread" #82 daemon prio=5 os_prio=0 tid=0x00007f15edd8e800 nid=0x1ca0 waiting on condition [0x00007f150692c000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000f5840660> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQue
uedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:559)

"main-SendThread(172.20.140.23:2181)" #81 daemon prio=5 os_prio=0 tid=0x00007f15edd8a800 nid=0x1c9f runnable [0x00007f150672a000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000f571b3e8> (a sun.nio.ch.Util$3)
- locked <0x00000000f571b3f8> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000f571b3a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:332)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)

可以看到zk中代码出现阻塞的原因是和ManagedLedgerImpl中的代码出现了资源竞争,涉及到的代码如下:

private boolean recoverCursorFromZK(String cursorName, final OpenCursorCallback callback, final Object ctx) {
CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
store.asyncGetCursorInfo(name, cursorName, new MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info, Stat stat) {
existsFuture.complete(true);
}

@Override
public void operationFailed(MetaStoreException e) {
log.info("vvv method {} open failed", "recoverCursorFromZK");
existsFuture.complete(false);
}
});

try {
// 阻塞的原因是因为这条语句
if (!existsFuture.get()) {
return false;
}
} catch (Exception e) {
log.error("vvv method recoverCursorFromZK error", e);
return false;
}

// other code

return true;
}

3. 问题解决

修改recoverCursorFromZK方法,把通过get进行任务执行完毕的判断改为confuture.thenAccept()。

4. 问题总结

其实最后也没发现为什么出现资源竞争,好像就是在一个线程中使用CompletableFuture就出现了问题。

整个代码的逻辑是:

  • 通过zk查询数据,设置回调方法等待zk查询完成后执行。

  • zk查询完成后,通过唯一的线程执行回调方法,回调方法中就是future.complete()。

  • 另外的线程开始执行同样的查询,回调方法同样是future.complete(),只不过还有其他线程调用了future.get()。

流程就像下面这种:

public void testFuture() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();

CompletableFuture<Boolean> f1 = new CompletableFuture<>();

new Thread(() -> {
CompletableFuture<Boolean> f2 = new CompletableFuture<>();
executor.execute(() -> {
sleep(1000);

System.out.println("f2 start");
f2.complete(true);
System.out.println("f2 done");
});

executor.execute(() -> {
sleep(1000);

System.out.println("f1 start");
f1.complete(true);
System.out.println("f1 done");
});
}).start();

System.out.println("init done");
f1.get();
System.out.println("run done");
}

private void sleep(int i) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
Author: iMine
Link: https://imine141.github.io/2021/09/01/pulsar/%E3%80%90bug%E3%80%91%E8%AE%B0%E4%B8%80%E6%AC%A1CompletableFuture%E5%BC%95%E8%B5%B7%E7%9A%84%E6%AD%BB%E9%94%81%E9%97%AE%E9%A2%98/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.