fastjson和jackson冲突造成的数据解析问题

1. 版本

pulsar:2.8.0

fastjson:1.2.76

2. 问题

在pulsar项目中引入fastjson后,pulsar中某些admin api不能正常使用,会报NPE错误。

执行命令会报错:

[mac bin]$ ./pulsar-admin namespaces set-backlog-quota --limit 100M --limitTime 1111 --policy producer_request_hold sample/ns118:38:54.968 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://x.x.x.x:8080/admin/v2/namespaces/sample/ns1/backlogQuota] Failed to perform http post request: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server ErrorHTTP 500 Internal Server Error
Reason: HTTP 500 Internal Server Error[mac bin]$

查看pulsar的日志:

11:50:05.098 [pulsar-web-63-6] ERROR org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Failed to update backlog quota map for namespace sample/ns1org.apache.pulsar.metadata.api.MetadataStoreException: com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: org.apache.pulsar.common.policies.data.Policies["backlog_quota_map"]->java.util.LinkedHashMap["destination_storage"]->com.sun.proxy.$Proxy117["limitSize"])  at org.apache.pulsar.broker.resources.BaseResources.set(BaseResources.java:94) ~[org.apache.pulsar-pulsar-broker-common-2.8.0.jar:2.8.0]  at org.apache.pulsar.broker.admin.impl.NamespacesBase.internalSetBacklogQuota(NamespacesBase.java:1387) ~[pulsar-broker-2.8.0.jar:2.8.0]  at org.apache.pulsar.broker.admin.v2.Namespaces.setBacklogQuota(Namespaces.java:724) ~[pulsar-broker-2.8.0.jar:2.8.0]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_231]  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_231]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_231]  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_231]  at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.34.jar:?]  at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) ~[org.glassfish.jersey.core-jersey-server-2.34.jar:?]  at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.34.jar:?]  at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:65) ~[pulsar-broker-2.8.0.jar:2.8.0]  at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1435) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) ~[org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1350) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:179) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) ~[org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) [org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:383) [org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar:9.4.42.v20210604]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231]  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]

虽然能明显看出来是json解析报错引起的空指针问题,但是不知道是哪一个步骤造成解析失败。

3. 问题解决

经过一系列的排查后,发现是fastjson自动注册provider引起的。

pulsar使用jersey框架作为restful接口的处理,同时使用jackson作为json解析框架,但是项目引入fastjson后,解析框架就会被替换成fast接送。

所以解决方法就是找到注册相关的代码,设置不自动注册。

查看fastjson中注册的代码:

// 这里继承jersey的AutoDiscoverable接口,并设置注册优先权高于jersey的默认权限,是为了优先注册fastjson的解析框架。
@Priority(AutoDiscoverable.DEFAULT_PRIORITY - 1)
public class FastJsonAutoDiscoverable implements AutoDiscoverable {
public static final String FASTJSON_AUTO_DISCOVERABLE = "fastjson.auto.discoverable";
public volatile static boolean autoDiscover = true;
static {
// 获取系统属性
try {
autoDiscover = Boolean.parseBoolean(System.getProperty(FASTJSON_AUTO_DISCOVERABLE, String.valueOf(autoDiscover)));
} catch (Throwable ex) {
//skip
}
}
public void configure(final FeatureContext context) {
final Configuration config = context.getConfiguration();
// Register FastJson. 如果设置了自动注册,且还没有注册过,则进行注册。
if (!config.isRegistered(FastJsonFeature.class) && autoDiscover) {
context.register(FastJsonFeature.class);
}
}
}

所以如果不想用fastjson,则需要设置自动注册为false:

System.setProperty(FastJsonAutoDiscoverable.FASTJSON_AUTO_DISCOVERABLE, "false");

这个是全局变量,所以需要在程序启动的时候设置,如果在jersey加载完之后在设置就没有效果了。

4. 问题排查

解决办法虽然很简单,一行代码就可以了,但是找问题却耗费了很长时间,下面记录下查找问题做的一些工作。

4.1 问题确认

出现问题的原因是我们添加了自己的业务逻辑,那么就要确认这个问题是新增代码引起的还是以前就有的bug。

所以首先需要把环境恢复成修改之前的环境,然后测试。

结果发现之前的环境没问题,所以确认出现问题的原因就是最近新加的代码。

4.2 在问题出现的地方找原因

通常情况下,对于新出现的问题,我们不可能马上明白原因是啥,不知道是之前哪一块逻辑修改造成的,尤其是代码可能不是自己写的。

所以,最简单的就是在问题出现地方的前后打日志,理清前后逻辑。

该问题出现在

org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl:


@Override
public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyFunction) {
log.info("vvv method {}, path {}", "readModifyUpdate0", path);
return executeWithRetry(() -> objCache.get(path)
.thenCompose(optEntry -> {
if (!optEntry.isPresent()) {
return FutureUtils.exception(new NotFoundException(""));
}

CacheGetResult<T> entry = optEntry.get();
T currentValue = entry.getValue();

long expectedVersion = optEntry.get().getStat().getVersion();

log.info("vvv method {}, path: {}, class: {}, value: {}, hash: {}, expectedVersion: {}, p: {}",
"readModifyUpdate", path, currentValue.getClass(), currentValue, currentValue.hashCode(),
expectedVersion, this.hashCode());

if (currentValue instanceof Policies) {
Policies p = (Policies) currentValue;
if (p.backlog_quota_map.size() > 0) {
p.backlog_quota_map.forEach((key, value) -> {
log.info("vvv method {}, path {}, key {}, value {} {}", "readModifyUpdate", path, key,
value.getClass(), value);
});
}
}


T newValueObj;
byte[] newValue;
try {
// Use clone and CAS zk to ensure thread safety
// 问题出现在这里
currentValue = serde.deserialize(serde.serialize(currentValue));
// apply方法可以拿到之前方法的值。
newValueObj = modifyFunction.apply(currentValue);
log.info("vvv method {}, path: {}, class: {}, newValue: {}", "readModifyUpdate", path,
newValueObj.getClass(), newValueObj);

newValue = serde.serialize(newValueObj);
} catch (Throwable t) {
return FutureUtils.exception(t);
}

return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(stat -> {
// Make sure we have the value cached before the operation is completed
log.info("vvv method {}, path {}", "readModifyUpdate-put", path);
objCache.put(path,
FutureUtils.value(Optional.of(new CacheGetResult<>(newValueObj, stat))));
}).thenApply(__ -> newValueObj);
}), path);
}

通过阅读该类的代码,可以发现:

  • 该类缓存了一些从zookeeper中查询的数据,缓存使用的框架是Caffeine。

  • 缓存中的数据会定时老化。

  • 该类提供了数据修改方法,接收到修改指令后会更新到zookeeper中,然后更新缓存数据。

  • 在上述方法中,接收到更新指令后,首先根据path查找缓存中是否存在,如果不存在则直接返回;如果存在,则获取数据和数据版本(对应zookeeper中的版本),把旧数据序列化后再反序列化(不知道意义是啥),然后把新数据保存到zookeeper中,保存成功后更新缓存数据。

  • 缓存初始化的方法是readValueFromStore用于从zookeeper中查询数据。

打包后替换线上环境,并看下打印的日志:

11:50:05.095 [pulsar-web-63-6] INFO  org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl - vvv method readModifyUpdate, path: /admin/policies/sample/ns1, class: class org.apache.pulsar.common.policies.data.Policies, value: Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={destination_storage={}}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=null, subscription_expiration_time_minutes=0, retention_policies=null, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=Full, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=true, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null), hash: -2056889852, expectedVersion: 0, p: 1890262240

可以看到backlog_quota_map={destination_storage={}}中destination_storage是一个空值,所以当序列化的时候会抛出NPE异常。

正常情况下的日志应该是:

10:49:44.719 [metadata-store-32-1] INFO  org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl - vvv method readValueFromStore, path /admin/policies/sample/ns1, class class org.apache.pulsar.common.policies.data.Policies, value Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={destination_storage=BacklogQuotaImpl(limitSize=104857600, limitTime=1111, policy=producer_request_hold)}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=null, subscription_expiration_time_minutes=0, retention_policies=null, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=Full, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=true, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null), hash 1614905576, version: 1, p: 1843660571

destination_storage是有内容的,内容是:

BacklogQuotaImpl(limitSize=104857600, limitTime=1111, policy=producer_request_hold)。

由于这里的数据是从缓存中获取的,所以怀疑从缓存中拿到数据就是空,但是在readValueFromStore方法中打印日志后,发现这里的数据并不是空,而且内容和readModifyUpdate中不一样。所以就能确认在其他地方修改了缓存中的数据

4.3 查看调用链

通过debug可以确认调用链是:

  • org.apache.pursar.broker.admin.v2.Namespaces -> setBacklogQuota

  • org.apache.pursar.broker.admin.impl.NamespaceBase -> internalSetBacklogQuota

  • org.apache.pulsar.broker.resources -> set

  • org.apache.pulsar.broker.resources -> setAsync

  • org.apache.pulsar.metadata.cache.impl -> readModifyUpdate

第一个方法就是提供了http接口,接收客户端传递的参数。

public void setBacklogQuota(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
@ApiParam(value = "Backlog quota for all topics of the specified namespace")
BacklogQuota backlogQuota) {
validateNamespaceName(tenant, namespace);
log.info("vvv method {}, backlogQuota {}, class {}", "setBacklogQuota", backlogQuota, backlogQuota.getClass());
internalSetBacklogQuota(backlogQuotaType, backlogQuota);
}

此处打印的日志如下:

11:50:05.094 [pulsar-web-63-6] INFO  org.apache.pulsar.broker.admin.v2.Namespaces - vvv method setBacklogQuota, backlogQuota {}, class com.sun.proxy.$Proxy117 {}

第二个方法:

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
try {
final String path = path(POLICIES, namespaceName.toString());
// 从MetaCacheImpl中获取缓存中的数据。
Policies policies = namespaceResources().get(path)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist"));
RetentionPolicies r = policies.retention_policies;
if (r != null) {
Policies p = new Policies();
p.backlog_quota_map.put(quotaType, backlogQuota);
if (!checkQuotas(p, r)) {
log.warn(
"[{}] Failed to update backlog configuration"
+ " for namespace {}: conflicts with retention quota",
clientAppId(), namespaceName);
new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
}
// 修改缓存中的数据(这里是直接修改缓存中的数据,所以在之前的日志中,
// 我们发现readModifyUpdate方法中拿到的缓存对象的内容和缓存加载方法readValueFromStore中的不一致)。
policies.backlog_quota_map.put(quotaType, backlogQuota);
log.info("vvv method {}, path {}, policies {}", "internalSetBacklogQuota", path, policies);
namespaceResources().set(path, p -> policies);
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName,
jsonMapper().writeValueAsString(backlogQuota));

} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

这个方法中,先是从MetaCacheImple对象中根据path拿到缓存中的policies,然后更新该policies,然后再通过MetaCacheImpl的update方法更新zookeeper中的数据。

通过上述流程,我们可以确定的是,缓存从zookeeper中加载的数据是正确的,客户端调用命令后传递的数据是错误的,所以定位到问题出现在调用链的第一个方法中。

4.4 问题分析

从前面的过程中,我们就能大致确认问题是客户端传递的参数没有被正常解析或者客户端传递的参数为空,然后pulsar本身使用jersey作为restful框架,使用jackson解析请求中的json数据,并对请求参数进行赋值。

首先通过tcpdump捕获交互过程中的数据包,确定客户端发送的数据中是有内容的:

image-20231007103138364

可以看到传递的内容不为空,然后看不报错情况下的数据报文:

image-20231007103221655

由此就可以确定不是客户端的问题,而是broker端接收到数据后的解析问题。

结合以往经验,推测是不同的json处理框架有冲突,导致数据解析失败。把fastjson去掉后,果然问题不再出现!

4.5 问题解决

既然是fastjson的问题,那么推测是加载fastjson包后,会在某些情况下替换jackson进行json数据的解析,导致解析出错。

pulsar是基于jersey作为restful框架的,而jersey遵循了javax-ws规范,规范中定义了加载哪些数据解析类,即定义了加载哪一个json框架作为解析框架。

搜索关键词:fastjson和jersey冲突,发现fastjson项目中的一个issue:

https://github.com/alibaba/fastjson/issues/1392

有人遇到了相似的问题,fastjson进行了修复。

我们的问题与上述问题还是有些不同的,但本质一样,都是由于fastjson默认加载了自己的作为java-ws的provider,导致jersey不能加载jackson。

而fastjson又不够强大,如果参数中的变量类型是interface,fastjson不能找到该interface的实现类并赋值,所以我们看到setBacklogQuota方法中backlogQuota是空的,而且class是com.sun.proxy.$Proxy117。

如果我们把BacklogQuota换成backlogQuotaImpl或者我们自己写的一个包含三个参数的类,则能成功赋值,例如这样:


@Data
@AllArgsConstructor
@NoArgsConstructor
public class VVData {
private long limitSize;
// backlog quota by time in second
private int limitTime;
private BacklogQuota.RetentionPolicy policy;
}

打印结果:

12:14:01.371 [pulsar-web-63-12] INFO  org.apache.pulsar.broker.admin.v2.Namespaces - vvv method setBacklogQuota, backlogQuota VVData(limitSize=104857600, limitTime=1111, policy=producer_request_hold

5. 疑问

上边问题已经解决了,但是在找问题的过程中,发现可以通过其他地方修改Caffeine缓存中的数据。

这里在多线程同时操作情况下有出现数据不一致的问题吧

fastjson不会自动把自己设置为java-ws的provider,应该是jersey在启动的时候查找AutoDiscoverable接口的所有实现类,然后根据实现类里面设置的优先级加载优先级最高的provider,或者按照优先级顺序加载所有的provider。AutoDiscoverable的代码如下:

package org.glassfish.jersey.internal.spi;
import javax.ws.rs.core.FeatureContext;
/** * A service provider contract for JAX-RS and Jersey components that need to be automatically discovered and registered in * {@link javax.ws.rs.core.Configuration runtime configurations}. * <p/> * A component implementing this contract becomes auto-discoverable by adding a new entry with fully qualified name of its * implementation class name to a {@code org.glassfish.jersey.internal.spi.AutoDiscoverable} file in the {@code * META-INF/services} directory. * <p/> * Almost all Jersey {@code AutoDiscoverable} implementations have * {@link #DEFAULT_PRIORITY} {@link javax.annotation.Priority priority} set. * * @author Michal Gajdos */public interface AutoDiscoverable {
/** * Default common priority of Jersey build-in auto-discoverables. * Use lower number on your {@code AutoDiscoverable} implementation to run it before Jersey auto-discoverables * and vice versa. * 优先级,在fastjson中设置的优先级是 (DEFAULT_PRIORITY - 1),所以会优先加载。 */ public static final int DEFAULT_PRIORITY = 2000;
/** * A call-back method called when an auto-discoverable component is to be configured in a given runtime configuration scope. * <p> * Note that as with {@link javax.ws.rs.core.Feature JAX-RS features}, before registering new JAX-RS components in a * given configurable context, an auto-discoverable component should verify that newly registered components are not * already registered in the configurable context. * </p> * * @param context configurable context in which the auto-discoverable should be configured. */ public void configure(FeatureContext context);}
Author: iMine
Link: https://imine141.github.io/2021/09/14/pulsar/%E3%80%90bug%E3%80%91fastjson%E5%92%8Cjackson%E5%86%B2%E7%AA%81%E9%80%A0%E6%88%90%E7%9A%84%E6%95%B0%E6%8D%AE%E8%A7%A3%E6%9E%90%E9%97%AE%E9%A2%98/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.