// 执行查询或者回调函数 /** * Run the task in the executor thread and fail the future if the executor is shutting down */ protected void execute(Runnable task, CompletableFuture<?> future) { try { executor.execute(task); } catch (Throwable t) { future.completeExceptionally(t); } }
@Override public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) { boolean hasVersion = optExpectedVersion.isPresent(); int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
CompletableFuture<Stat> future = new CompletableFuture<>();
try { if (hasVersion && expectedVersion == -1) { CreateMode createMode = getCreateMode(options); ZkUtils.asyncCreateFullPathOptimistic(zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode, (rc, path1, ctx, name) -> { execute(() -> { Code code = Code.get(rc); if (code == Code.OK) { // 运行到此处时出现阻塞。 future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true)); } else if (code == Code.NODEEXISTS) { // We're emulating a request to create node, so the version is invalid future.completeExceptionally(getException(Code.BADVERSION, path)); } else { future.completeExceptionally(getException(code, path)); } }, future); }, null); } else { // other code... } } catch (Throwable t) { future.completeExceptionally(new MetadataStoreException(t)); }
return future; }
2.2 查看进程和cpu使用情况
然后查询进程和cpu使用情况:
# 找到进程ID
ps -ef |grep broker
# 查询进程cpu
top -Hp 16704
看到cpu使用的并不多,再10%以下,由此可推断不是gc或者其他死循环造成的阻塞。
2.3 查看堆栈信息
"metadata-store-7-1" #84 prio=5 os_prio=0 tid=0x00007f1560004800 nid=0x1ca1 waiting on condition [0x00007f1506a2c000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000fd0c3d18> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.recoverCursorFromZK(ManagedLedgerImpl.java:1072) at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:1019) // 阻塞原因 - locked <0x00000000f5841ce8> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl) at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:857) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:747) at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:1050) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$277/1382611823.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage$$Lambda$295/640790180.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) // 阻塞代码 at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$null$15(ZKMetadataStore.java:234) at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$127/153703270.run(Unknown Source) at org.apache.pulsar.metadata.impl.AbstractMetadataStore.lambda$execute$8(AbstractMetadataStore.java:261) at org.apache.pulsar.metadata.impl.AbstractMetadataStore$$Lambda$128/1782705605.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
"main-EventThread" #82 daemon prio=5 os_prio=0 tid=0x00007f15edd8e800 nid=0x1ca0 waiting on condition [0x00007f150692c000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000f5840660> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQue
uedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:559)
"main-SendThread(172.20.140.23:2181)" #81 daemon prio=5 os_prio=0 tid=0x00007f15edd8a800 nid=0x1c9f runnable [0x00007f150672a000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00000000f571b3e8> (a sun.nio.ch.Util$3) - locked <0x00000000f571b3f8> (a java.util.Collections$UnmodifiableSet) - locked <0x00000000f571b3a0> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:332) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)