1. 版本
pulsar:2.8.0
fastjson:1.2.76
2. 问题
在pulsar项目中引入fastjson后,pulsar中某些admin api不能正常使用,会报NPE错误。
执行命令会报错:
[mac bin]$ ./pulsar-admin namespaces set-backlog-quota --limit 100M --limitTime 1111 --policy producer_request_hold sample/ns118:38:54.968 [AsyncHttpClient-7-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://x.x.x.x:8080/admin/v2/namespaces/sample/ns1/backlogQuota] Failed to perform http post request: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server ErrorHTTP 500 Internal Server Error |
查看pulsar的日志:
11:50:05.098 [pulsar-web-63-6] ERROR org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Failed to update backlog quota map for namespace sample/ns1org.apache.pulsar.metadata.api.MetadataStoreException: com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: org.apache.pulsar.common.policies.data.Policies["backlog_quota_map"]->java.util.LinkedHashMap["destination_storage"]->com.sun.proxy.$Proxy117["limitSize"]) at org.apache.pulsar.broker.resources.BaseResources.set(BaseResources.java:94) ~[org.apache.pulsar-pulsar-broker-common-2.8.0.jar:2.8.0] at org.apache.pulsar.broker.admin.impl.NamespacesBase.internalSetBacklogQuota(NamespacesBase.java:1387) ~[pulsar-broker-2.8.0.jar:2.8.0] at org.apache.pulsar.broker.admin.v2.Namespaces.setBacklogQuota(Namespaces.java:724) ~[pulsar-broker-2.8.0.jar:2.8.0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_231] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_231] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_231] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_231] at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?] at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?] at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?] at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604] at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:65) ~[pulsar-broker-2.8.0.jar:2.8.0] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1435) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1350) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:179) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604] at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:383) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] |
虽然能明显看出来是json解析报错引起的空指针问题,但是不知道是哪一个步骤造成解析失败。
3. 问题解决
经过一系列的排查后,发现是fastjson自动注册provider引起的。
pulsar使用jersey框架作为restful接口的处理,同时使用jackson作为json解析框架,但是项目引入fastjson后,解析框架就会被替换成fast接送。
所以解决方法就是找到注册相关的代码,设置不自动注册。
查看fastjson中注册的代码:
// 这里继承jersey的AutoDiscoverable接口,并设置注册优先权高于jersey的默认权限,是为了优先注册fastjson的解析框架。 |
所以如果不想用fastjson,则需要设置自动注册为false:
System.setProperty(FastJsonAutoDiscoverable.FASTJSON_AUTO_DISCOVERABLE, "false"); |
这个是全局变量,所以需要在程序启动的时候设置,如果在jersey加载完之后在设置就没有效果了。
4. 问题排查
解决办法虽然很简单,一行代码就可以了,但是找问题却耗费了很长时间,下面记录下查找问题做的一些工作。
4.1 问题确认
出现问题的原因是我们添加了自己的业务逻辑,那么就要确认这个问题是新增代码引起的还是以前就有的bug。
所以首先需要把环境恢复成修改之前的环境,然后测试。
结果发现之前的环境没问题,所以确认出现问题的原因就是最近新加的代码。
4.2 在问题出现的地方找原因
通常情况下,对于新出现的问题,我们不可能马上明白原因是啥,不知道是之前哪一块逻辑修改造成的,尤其是代码可能不是自己写的。
所以,最简单的就是在问题出现地方的前后打日志,理清前后逻辑。
该问题出现在
org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl:
|
通过阅读该类的代码,可以发现:
该类缓存了一些从zookeeper中查询的数据,缓存使用的框架是Caffeine。
缓存中的数据会定时老化。
该类提供了数据修改方法,接收到修改指令后会更新到zookeeper中,然后更新缓存数据。
在上述方法中,接收到更新指令后,首先根据path查找缓存中是否存在,如果不存在则直接返回;如果存在,则获取数据和数据版本(对应zookeeper中的版本),把旧数据序列化后再反序列化(不知道意义是啥),然后把新数据保存到zookeeper中,保存成功后更新缓存数据。
缓存初始化的方法是readValueFromStore用于从zookeeper中查询数据。
打包后替换线上环境,并看下打印的日志:
11:50:05.095 [pulsar-web-63-6] INFO org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl - vvv method readModifyUpdate, path: /admin/policies/sample/ns1, class: class org.apache.pulsar.common.policies.data.Policies, value: Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={destination_storage={}}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=null, subscription_expiration_time_minutes=0, retention_policies=null, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=Full, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=true, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null), hash: -2056889852, expectedVersion: 0, p: 1890262240 |
可以看到backlog_quota_map={destination_storage={}}中destination_storage是一个空值,所以当序列化的时候会抛出NPE异常。
正常情况下的日志应该是:
10:49:44.719 [metadata-store-32-1] INFO org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl - vvv method readValueFromStore, path /admin/policies/sample/ns1, class class org.apache.pulsar.common.policies.data.Policies, value Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={destination_storage=BacklogQuotaImpl(limitSize=104857600, limitTime=1111, policy=producer_request_hold)}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=null, subscription_expiration_time_minutes=0, retention_policies=null, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=Full, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=true, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null), hash 1614905576, version: 1, p: 1843660571 |
destination_storage是有内容的,内容是:
BacklogQuotaImpl(limitSize=104857600, limitTime=1111, policy=producer_request_hold)。
由于这里的数据是从缓存中获取的,所以怀疑从缓存中拿到数据就是空,但是在readValueFromStore方法中打印日志后,发现这里的数据并不是空,而且内容和readModifyUpdate中不一样。所以就能确认在其他地方修改了缓存中的数据。
4.3 查看调用链
通过debug可以确认调用链是:
org.apache.pursar.broker.admin.v2.Namespaces -> setBacklogQuota
org.apache.pursar.broker.admin.impl.NamespaceBase -> internalSetBacklogQuota
org.apache.pulsar.broker.resources -> set
org.apache.pulsar.broker.resources -> setAsync
org.apache.pulsar.metadata.cache.impl -> readModifyUpdate
第一个方法就是提供了http接口,接收客户端传递的参数。
public void setBacklogQuota( String tenant, String namespace, |
此处打印的日志如下:
11:50:05.094 [pulsar-web-63-6] INFO org.apache.pulsar.broker.admin.v2.Namespaces - vvv method setBacklogQuota, backlogQuota {}, class com.sun.proxy.$Proxy117 {} |
第二个方法:
protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) { |
这个方法中,先是从MetaCacheImple对象中根据path拿到缓存中的policies,然后更新该policies,然后再通过MetaCacheImpl的update方法更新zookeeper中的数据。
通过上述流程,我们可以确定的是,缓存从zookeeper中加载的数据是正确的,客户端调用命令后传递的数据是错误的,所以定位到问题出现在调用链的第一个方法中。
4.4 问题分析
从前面的过程中,我们就能大致确认问题是客户端传递的参数没有被正常解析或者客户端传递的参数为空,然后pulsar本身使用jersey作为restful框架,使用jackson解析请求中的json数据,并对请求参数进行赋值。
首先通过tcpdump捕获交互过程中的数据包,确定客户端发送的数据中是有内容的:
可以看到传递的内容不为空,然后看不报错情况下的数据报文:
由此就可以确定不是客户端的问题,而是broker端接收到数据后的解析问题。
结合以往经验,推测是不同的json处理框架有冲突,导致数据解析失败。把fastjson去掉后,果然问题不再出现!
4.5 问题解决
既然是fastjson的问题,那么推测是加载fastjson包后,会在某些情况下替换jackson进行json数据的解析,导致解析出错。
pulsar是基于jersey作为restful框架的,而jersey遵循了javax-ws规范,规范中定义了加载哪些数据解析类,即定义了加载哪一个json框架作为解析框架。
搜索关键词:fastjson和jersey冲突,发现fastjson项目中的一个issue:
https://github.com/alibaba/fastjson/issues/1392
有人遇到了相似的问题,fastjson进行了修复。
我们的问题与上述问题还是有些不同的,但本质一样,都是由于fastjson默认加载了自己的作为java-ws的provider,导致jersey不能加载jackson。
而fastjson又不够强大,如果参数中的变量类型是interface,fastjson不能找到该interface的实现类并赋值,所以我们看到setBacklogQuota方法中backlogQuota是空的,而且class是com.sun.proxy.$Proxy117。
如果我们把BacklogQuota换成backlogQuotaImpl或者我们自己写的一个包含三个参数的类,则能成功赋值,例如这样:
|
打印结果:
12:14:01.371 [pulsar-web-63-12] INFO org.apache.pulsar.broker.admin.v2.Namespaces - vvv method setBacklogQuota, backlogQuota VVData(limitSize=104857600, limitTime=1111, policy=producer_request_hold |
5. 疑问
上边问题已经解决了,但是在找问题的过程中,发现可以通过其他地方修改Caffeine缓存中的数据。
这里在多线程同时操作情况下有出现数据不一致的问题吧
fastjson不会自动把自己设置为java-ws的provider,应该是jersey在启动的时候查找AutoDiscoverable接口的所有实现类,然后根据实现类里面设置的优先级加载优先级最高的provider,或者按照优先级顺序加载所有的provider。AutoDiscoverable的代码如下:
package org.glassfish.jersey.internal.spi; |