1. 概述 本系列文章准备详细的说明pulsar的认证和授权流程,并结合市场上常见的主-主账号授权、主-子账号授权需求提供解决办法。
2. 整体结构 首先我们需要对pulsar有一个整体的认识,pulsar本身是一个分布式部署的消息订阅分发中间件,它主要包含三个组件:
zookeeper:负责配置管理
broker:对外提供服务,接收发布订阅请求
bookkeeper:负责消息的持久化
3. 通信方式 pulsar提供了两种服务,一种是http的,主要负责处理创建tenant、namespace、topic和获取集群状态等信息;另外一种是基于netty写的tcp服务,主要用于接收客户端发送的消息和订阅请求。
http服务都在org.apache.pulsar.broker.admin包下,可以看到对于cluster、broker、tenant、namespace等都提供了对应的api操作接口。
tcp服务的核心处理类是org.apache.pulsar.broker.service.ServerCnx。
消息格式为TLV格式的二进制数据,每条消息的前4个字节为消息的总长度。
看下具体的消息格式:
4. 认证处理逻辑 客户端和broker之间是TCP长连接,在客户端连接到broker后,broker会对客户端的合法性进行验证,如果不合法则断开连接,判断流程是:
判断是否开启了连接认证(conf/broker.conf:authenticationEnabled)
如果没有开启认证,则返回连接成功
如果开启了认证,则获取认证内容、认证客户端选择的服务端认证策略
如果服务端不支持客户端提供的认证策略,则返回认证失败
如果支持,则调用策略的认证方法(AuthenticationProvider.authenticate)
如果认证过程没有抛出异常,则返回认证成功,并保存认证结果(这里的认证结果就是role,后续会根据role对客户端的其他操作进行授权管理)
认证过程完成。
认证处理包含几个重要的接口
@Override public void initialize (ServiceConfiguration config) throws IOException { } @Override public String getAuthMethodName () { } @Override public String authenticate (AuthenticationDataSource authData) throws AuthenticationException { } @Override public void close () throws IOException { }
Authentication
@Override public String getAuthMethodName () { } @Override public AuthenticationDataProvider getAuthData () throws PulsarClientException { } @Override public void configure (Map<String, String> authParams) { } @Override public void start () throws PulsarClientException { } @Override public void close () throws IOException { }
AuthenticationDataProvider
@Override public boolean hasDataForHttp () { } @Override public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception { } @Override public boolean hasDataFromCommand () { } @Override public String getCommandData () { }
5. 授权处理逻辑 授权需要在客户端连接认证的基础上进行,根据连接认证完成后生成的role对后续的操作进行权限管理。
授权管理包含两部分:
使用pulsar-admin管理集群时的权限验证(http协议)
客户端连接到broker进行数据读写时的权限验证(tcp协议)
上述两部分授权的处理逻辑其实没有区别,都是先通过连接认证获取获取客户端的role,然后根据role判断客户端是否有对应操作的权限。由于上边已经说了role的获取途径,下面重点写下pulsar对于客户端的哪些操作进行了权限的判断。
6. 自定义认证和授权 自定义认证和授权只需要实现前两节中列出的接口即可。
7. 第三方认证和授权中心 为了能更好的对客户端的权限进行管理,需要一个统一的第三方认证中心,记录tenant、namespace和topic的信息以及用户信息,并建立起用户和操作之间的权限关系。这样,在客户端连接到broker后,broker就可以根据客户端传递的用户信息对客户端操作进行权限验证。
8. 具体实现(不包含真正的权限验证部分,只是为了测试整体流程) 8.1 实现broker端认证接口 package auth.server;import org.apache.pulsar.broker.ServiceConfiguration;import org.apache.pulsar.broker.authentication.AuthenticationDataSource;import org.apache.pulsar.broker.authentication.AuthenticationProvider;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.naming.AuthenticationException;import java.io.IOException;import java.util.Set;import java.util.concurrent.atomic.AtomicLong;public class VVAuthenticationProvider implements AuthenticationProvider { private static final Logger log = LoggerFactory.getLogger(VVAuthenticationProvider.class); private static final String methodName = "vv_auth_v2" ; private AtomicLong seq = new AtomicLong (); private String header = "vv_auth" ; @Override public void initialize (ServiceConfiguration config) throws IOException { log.info(methodName + " initialize" + ", seq=" + seq.incrementAndGet()); if (config == null ) { return ; } Set<String> superRoles = config.getSuperUserRoles(); if (superRoles == null ) { return ; } for (String role : superRoles) { log.info(methodName + " initialize " + role + ", seq=" + seq.incrementAndGet()); } } @Override public String getAuthMethodName () { log.info(methodName + " getAuthMethodName" ); return methodName; } @Override public String authenticate (AuthenticationDataSource authData) throws AuthenticationException { log.info(methodName + " authenticate" + ", seq=" + seq.incrementAndGet()); String roleToken = "unknown" ; if (authData.hasDataFromCommand()) { roleToken = authData.getCommandData(); } else if (authData.hasDataFromHttp()) { roleToken = authData.getHttpHeader(header); } else { throw new AuthenticationException ("Authentication data source does not have a role token" ); } log.info(methodName + " authenticate " + roleToken + ", seq=" + seq.incrementAndGet()); return roleToken; } @Override public void close () throws IOException { log.info(methodName + " close" + ", seq=" + seq.incrementAndGet()); } }
8.2 实现客户端认证接口 package auth.client; import org.apache.pulsar.client.api.Authentication;import org.apache.pulsar.client.api.AuthenticationDataProvider;import org.apache.pulsar.client.api.PulsarClientException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.util.Map;public class VVAuthentication implements Authentication { private static final Logger log = LoggerFactory.getLogger(VVAuthentication.class); private static final String methodName = "vv_auth_v2" ; @Override public String getAuthMethodName () { log.info(methodName + " getAuthMethodName" ); return methodName; } @Override public AuthenticationDataProvider getAuthData () throws PulsarClientException { log.info(methodName + " getAuthData" ); return new VVAuthenticationDataProvider (); } @Override public void configure (Map<String, String> authParams) { log.info(methodName + " configure" ); if (authParams == null ) { return ; } authParams.forEach((key, value) -> { log.info(methodName + " configure " + key + "=" + value); }); } @Override public void start () throws PulsarClientException { log.info(methodName + " start" ); } @Override public void close () throws IOException { log.info(methodName + " close" ); } }
package auth.client;import org.apache.pulsar.client.api.AuthenticationDataProvider;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.HashMap;import java.util.Map;import java.util.Set;public class VVAuthenticationDataProvider implements AuthenticationDataProvider { private static final Logger log = LoggerFactory.getLogger(VVAuthenticationDataProvider.class); private static final String methodName = "vv_auth_v2" ; private String header = "vv_auth" ; private String token = "vv-role" ; @Override public boolean hasDataForHttp () { log.info(methodName + " hasDataForHttp" ); return true ; } @Override public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception { log.info(methodName + " getHttpHeaders" ); Map<String, String> headers = new HashMap <>(); headers.put(header, token); return headers.entrySet(); } @Override public boolean hasDataFromCommand () { log.info(methodName + " hasDataFromCommand" ); return true ; } @Override public String getCommandData () { log.info(methodName + " getCommandData" ); return token; } }
8.3 修改conf/broker.conf配置文件 ### --- Authentication --- ### # Enable authentication # 开启连接认证 authenticationEnabled=true # Authentication provider name list, which is comma separated list of class names # 自定义的broker端实现的处理类 authenticationProviders=auth.server.VVAuthenticationProvider # Interval of time for checking for expired authentication credentials authenticationRefreshCheckSeconds=60 # Enforce authorization # 开启授权认证 authorizationEnabled=true # Authorization provider fully qualified class-name # 自定义的授权认证处理类 authorizationProvider=auth.server.VVPulsarAuthorizationProvider # Allow wildcard matching in authorization # (wildcard matching only applicable if wildcard-char: # * presents at first or last position eg: *.pulsar.service, pulsar.service.*) authorizationAllowWildcardsMatching=false # Role names that are treated as "super-user", meaning they will be able to do all admin # operations and publish/consume from all topics # 超级用户,创建tenant的时候需要超级用户 superUserRoles=vv-role,cc-role # Authentication settings of the broker itself. Used when the broker connects to other brokers, # either in same or other clusters brokerClientTlsEnabled=false # 自定义的客户端认证实现类 brokerClientAuthenticationPlugin=auth.client2.client.VVAuthentication brokerClientAuthenticationParameters= brokerClientTrustCertsFilePath=
8.4 重启broker ./bin/pulsar-daemon stop broker
./bin/pulsar-daemon start broker
8.5 验证是否有效 通过pulsar-admin命令进行验证
首先配置conf/client.conf文件
authPlugin=auth.client2.client.VVAuthenticationauthParams=
执行命令并验证结果
# 执行以下命令./bin/pulsar-admin tenants list# 客户端侧输出日志18:21:52.173 [main] INFO auth.client.VVAuthentication - vv_auth_v2 configure18:21:52.175 [main] INFO auth.client.VVAuthentication - vv_auth_v2 configure token=123456vv18:21:52.335 [main] INFO auth.client.VVAuthentication - vv_auth_v2 getAuthMethodName18:21:52.336 [main] INFO auth.client.VVAuthentication - vv_auth_v2 start18:21:52.611 [main] INFO auth.client.VVAuthentication - vv_auth_v2 getAuthData18:21:52.612 [main] INFO auth.client.VVAuthenticationDataProvider - vv_auth_v2 hasDataForHttp18:21:52.789 [main] INFO auth.client.VVAuthenticationDataProvider - vv_auth_v2 hasDataForHttp18:21:52.789 [main] INFO auth.client.VVAuthenticationDataProvider - vv_auth_v2 getHttpHeaders# 服务端(broker)侧输出日志18:23:53.957 [pulsar-web-41-11] INFO auth.server.VVAuthenticationProvider - vv_auth_v2 authenticate, seq=329618:23:53.957 [pulsar-web-41-11] INFO auth.server.VVAuthenticationProvider - vv_auth_v2 authenticate vv-role, seq=329718:23:53.958 [pulsar-web-41-11] INFO org.eclipse.jetty.server.RequestLog - x.x.x.x - - [29/Jul/2021:18:23:53 +0800] "GET /admin/v2/tenants HTTP/1.1" 200 42 "-" "Pulsar-Java-v2.8.0" 1# 在pulsar-admin侧可以看到输出的tenant列表"public""pulsar""tenant_c""tenant_vv"
通过java程序验证
package auth;import auth.client.VVAuthentication;import org.apache.pulsar.client.api.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class AuthTest { private static final Logger log = LoggerFactory.getLogger(AuthTest.class); public static void main (String[] args) throws Exception { AuthTest main = new AuthTest (); main.run(); } String pulsarUrl = "pulsar://x.x.x.x:6650" ; String topic = "persistent://tenant_vv/ns1/auth_test" ; Authentication authentication = new VVAuthentication (); void run () throws Exception { PulsarClient client = PulsarClient.builder() .authentication(authentication) .serviceUrl(pulsarUrl) .build(); consume(client); System.out.println("connect successed " ); client.close(); } void consume (PulsarClient client) throws Exception { Consumer consumer = client.newConsumer() .topic(topic) .subscriptionName("consumer-test" ) .subscribe(); while (true ) { Message m = consumer.receive(); if (m != null ) { log.info("recv " + new String (m.getData())); consumer.acknowledge(m); } else { break ; } } } void send (PulsarClient client) throws Exception { Producer p = client.newProducer() .topic(topic) .create(); for (int i=0 ; i<10 ; i++) { p.newMessage().key("aaa" ).value(("hello " + i).getBytes()).send(); log.info("send " + i); Thread.sleep(1000 ); } p.flush(); p.close(); System.out.println("send done" ); } void testReader (PulsarClient client) throws Exception { Reader reader = client.newReader() .subscriptionName("reader-test" ) .topic(topic) .startMessageId(MessageId.earliest) .create(); while (reader.hasMessageAvailable()) { Message msg = reader.readNext(); log.info("reader recv msg, id=" + msg.getMessageId() + " key=" + msg.getKey() + ", value=" + new String (msg.getData())); } reader.close(); } }