认证和授权(1)

1. 概述

本系列文章准备详细的说明pulsar的认证和授权流程,并结合市场上常见的主-主账号授权、主-子账号授权需求提供解决办法。

2. 整体结构

首先我们需要对pulsar有一个整体的认识,pulsar本身是一个分布式部署的消息订阅分发中间件,它主要包含三个组件:

zookeeper:负责配置管理

broker:对外提供服务,接收发布订阅请求

bookkeeper:负责消息的持久化

image-20210611184242420

3. 通信方式

pulsar提供了两种服务,一种是http的,主要负责处理创建tenant、namespace、topic和获取集群状态等信息;另外一种是基于netty写的tcp服务,主要用于接收客户端发送的消息和订阅请求。

http服务都在org.apache.pulsar.broker.admin包下,可以看到对于cluster、broker、tenant、namespace等都提供了对应的api操作接口。

tcp服务的核心处理类是org.apache.pulsar.broker.service.ServerCnx。

消息格式为TLV格式的二进制数据,每条消息的前4个字节为消息的总长度。

看下具体的消息格式:

image-20231007104802008

4. 认证处理逻辑

客户端和broker之间是TCP长连接,在客户端连接到broker后,broker会对客户端的合法性进行验证,如果不合法则断开连接,判断流程是:

  1. 判断是否开启了连接认证(conf/broker.conf:authenticationEnabled)
  2. 如果没有开启认证,则返回连接成功
  3. 如果开启了认证,则获取认证内容、认证客户端选择的服务端认证策略
  4. 如果服务端不支持客户端提供的认证策略,则返回认证失败
  5. 如果支持,则调用策略的认证方法(AuthenticationProvider.authenticate)
  6. 如果认证过程没有抛出异常,则返回认证成功,并保存认证结果(这里的认证结果就是role,后续会根据role对客户端的其他操作进行授权管理)
  7. 认证过程完成。

认证处理包含几个重要的接口

  • 服务端(broker):

    org.apache.pulsar.broker.authentication.AuthenticationProvider,核心方法如下:


@Override
public void initialize(ServiceConfiguration config) throws IOException {
// 初始化时使用,在broker启动的时候初始化。
}
@Override
public String getAuthMethodName() {
// 范围认证策略的名称,需要和客户端发送的名称保持一致。
}
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
// 进行客户端认证,返回客户端的角色名称。
}
@Override
public void close() throws IOException {
// broker关闭时调用此方法。
}
  • 客户端:

    org.apache.pulsar.client.api.Authentication

    org.apache.pulsar.client.api.AuthenticationDataProvider,核心方法如下:

Authentication

@Override
public String getAuthMethodName() {
// 认证策略的名称。
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
// 返回认证数据持有对象,即另外一个核心接口的实现类。
}
@Override
public void configure(Map<String, String> authParams) {
// 程序启动时调用,可以得到配置文件中的配置项。
}
@Override
public void start() throws PulsarClientException {
// 程序启动时调用
}
@Override
public void close() throws IOException {
// 程序停止时调用
}

AuthenticationDataProvider

@Override
public boolean hasDataForHttp() {
// http区是否有数据
}
@Override
public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
// 获取http头信息,如果上述方法返回true,则这里要设置http header内容。然后broker端才能根据http header name获取到数据。
}
@Override
public boolean hasDataFromCommand() {
// 是否有命令数据
}
@Override
public String getCommandData() {
// 如果上述方法返回true,则这里要返回broker端需要的数据。
}

5. 授权处理逻辑

授权需要在客户端连接认证的基础上进行,根据连接认证完成后生成的role对后续的操作进行权限管理。

授权管理包含两部分:

  • 使用pulsar-admin管理集群时的权限验证(http协议)
  • 客户端连接到broker进行数据读写时的权限验证(tcp协议)

上述两部分授权的处理逻辑其实没有区别,都是先通过连接认证获取获取客户端的role,然后根据role判断客户端是否有对应操作的权限。由于上边已经说了role的获取途径,下面重点写下pulsar对于客户端的哪些操作进行了权限的判断。

image-20231007105427754

6. 自定义认证和授权

自定义认证和授权只需要实现前两节中列出的接口即可。

7. 第三方认证和授权中心

为了能更好的对客户端的权限进行管理,需要一个统一的第三方认证中心,记录tenant、namespace和topic的信息以及用户信息,并建立起用户和操作之间的权限关系。这样,在客户端连接到broker后,broker就可以根据客户端传递的用户信息对客户端操作进行权限验证。

8. 具体实现(不包含真正的权限验证部分,只是为了测试整体流程)

8.1 实现broker端认证接口

package auth.server;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.AuthenticationException;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* @function
* @date 2021/7/27 14:19
*/
public class VVAuthenticationProvider implements AuthenticationProvider {
private static final Logger log = LoggerFactory.getLogger(VVAuthenticationProvider.class);
private static final String methodName = "vv_auth_v2";
private AtomicLong seq = new AtomicLong();
private String header = "vv_auth";
@Override
public void initialize(ServiceConfiguration config) throws IOException {
log.info(methodName + " initialize" + ", seq=" + seq.incrementAndGet());
if (config == null) {
return;
}
Set<String> superRoles = config.getSuperUserRoles();
if (superRoles == null) {
return;
}
for (String role : superRoles) {
log.info(methodName + " initialize " + role + ", seq=" + seq.incrementAndGet());
}
}
@Override
public String getAuthMethodName() {
log.info(methodName + " getAuthMethodName");
return methodName;
}
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
log.info(methodName + " authenticate" + ", seq=" + seq.incrementAndGet());
String roleToken = "unknown";
if (authData.hasDataFromCommand()) {
roleToken = authData.getCommandData();
} else if (authData.hasDataFromHttp()) {
roleToken = authData.getHttpHeader(header);
} else {
throw new AuthenticationException("Authentication data source does not have a role token");
}
log.info(methodName + " authenticate " + roleToken + ", seq=" + seq.incrementAndGet());
return roleToken;
}
@Override
public void close() throws IOException {
log.info(methodName + " close" + ", seq=" + seq.incrementAndGet());
}
}

8.2 实现客户端认证接口

 package auth.client;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* @function
* @date 2021/7/27 14:00
*/
public class VVAuthentication implements Authentication {
private static final Logger log = LoggerFactory.getLogger(VVAuthentication.class);
private static final String methodName = "vv_auth_v2";
@Override
public String getAuthMethodName() {
log.info(methodName + " getAuthMethodName");
return methodName;
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
log.info(methodName + " getAuthData");
return new VVAuthenticationDataProvider();
}
@Override
public void configure(Map<String, String> authParams) {
log.info(methodName + " configure");
if (authParams == null) {
return;
}
authParams.forEach((key, value) -> {
log.info(methodName + " configure " + key + "=" + value);
});
}
@Override
public void start() throws PulsarClientException {
log.info(methodName + " start");
}
@Override
public void close() throws IOException {
log.info(methodName + " close");
}
}

package auth.client;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @function
* @date 2021/7/27 14:08
*/
public class VVAuthenticationDataProvider implements AuthenticationDataProvider {
private static final Logger log = LoggerFactory.getLogger(VVAuthenticationDataProvider.class);
private static final String methodName = "vv_auth_v2";
private String header = "vv_auth";
private String token = "vv-role";
@Override
public boolean hasDataForHttp() {
log.info(methodName + " hasDataForHttp");
return true;
}
@Override
public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
log.info(methodName + " getHttpHeaders");
Map<String, String> headers = new HashMap<>();
headers.put(header, token);
return headers.entrySet();
}
@Override
public boolean hasDataFromCommand() {
log.info(methodName + " hasDataFromCommand");
return true;
}
@Override
public String getCommandData() {
log.info(methodName + " getCommandData");
return token;
}
}

8.3 修改conf/broker.conf配置文件


### --- Authentication --- ###
# Enable authentication
# 开启连接认证
authenticationEnabled=true
# Authentication provider name list, which is comma separated list of class names
# 自定义的broker端实现的处理类
authenticationProviders=auth.server.VVAuthenticationProvider
# Interval of time for checking for expired authentication credentials
authenticationRefreshCheckSeconds=60
# Enforce authorization
# 开启授权认证
authorizationEnabled=true
# Authorization provider fully qualified class-name
# 自定义的授权认证处理类
authorizationProvider=auth.server.VVPulsarAuthorizationProvider
# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false
# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
# 超级用户,创建tenant的时候需要超级用户
superUserRoles=vv-role,cc-role
# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
brokerClientTlsEnabled=false
# 自定义的客户端认证实现类
brokerClientAuthenticationPlugin=auth.client2.client.VVAuthentication
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=

8.4 重启broker

./bin/pulsar-daemon stop broker

./bin/pulsar-daemon start broker

8.5 验证是否有效

通过pulsar-admin命令进行验证

首先配置conf/client.conf文件

authPlugin=auth.client2.client.VVAuthenticationauthParams=

执行命令并验证结果

# 执行以下命令./bin/pulsar-admin tenants list# 客户端侧输出日志18:21:52.173 [main] INFO  auth.client.VVAuthentication - vv_auth_v2 configure18:21:52.175 [main] INFO  auth.client.VVAuthentication - vv_auth_v2 configure token=123456vv18:21:52.335 [main] INFO  auth.client.VVAuthentication - vv_auth_v2 getAuthMethodName18:21:52.336 [main] INFO  auth.client.VVAuthentication - vv_auth_v2 start18:21:52.611 [main] INFO  auth.client.VVAuthentication - vv_auth_v2 getAuthData18:21:52.612 [main] INFO  auth.client.VVAuthenticationDataProvider - vv_auth_v2 hasDataForHttp18:21:52.789 [main] INFO  auth.client.VVAuthenticationDataProvider - vv_auth_v2 hasDataForHttp18:21:52.789 [main] INFO  auth.client.VVAuthenticationDataProvider - vv_auth_v2 getHttpHeaders# 服务端(broker)侧输出日志18:23:53.957 [pulsar-web-41-11] INFO  auth.server.VVAuthenticationProvider - vv_auth_v2 authenticate, seq=329618:23:53.957 [pulsar-web-41-11] INFO  auth.server.VVAuthenticationProvider - vv_auth_v2 authenticate vv-role, seq=329718:23:53.958 [pulsar-web-41-11] INFO  org.eclipse.jetty.server.RequestLog - x.x.x.x - - [29/Jul/2021:18:23:53 +0800] "GET /admin/v2/tenants HTTP/1.1" 200 42 "-" "Pulsar-Java-v2.8.0" 1# 在pulsar-admin侧可以看到输出的tenant列表"public""pulsar""tenant_c""tenant_vv"

通过java程序验证

package auth;
import auth.client.VVAuthentication;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @function
* @date 2021/7/19 10:47
*/
public class AuthTest {
private static final Logger log = LoggerFactory.getLogger(AuthTest.class);
public static void main(String[] args) throws Exception {
AuthTest main = new AuthTest();
main.run();
}
String pulsarUrl = "pulsar://x.x.x.x:6650";
String topic = "persistent://tenant_vv/ns1/auth_test";
Authentication authentication = new VVAuthentication();
void run() throws Exception {
PulsarClient client = PulsarClient.builder()
.authentication(authentication)
.serviceUrl(pulsarUrl)
.build();
// send(client);
// testReader(client);
consume(client);
System.out.println("connect successed ");
client.close();
}
void consume(PulsarClient client) throws Exception {
Consumer consumer = client.newConsumer()
.topic(topic)
.subscriptionName("consumer-test")
.subscribe();
while (true) {
Message m = consumer.receive();
if (m != null) {
log.info("recv " + new String(m.getData()));
consumer.acknowledge(m);
} else {
break;
}
}
}
void send(PulsarClient client) throws Exception {
Producer p = client.newProducer()
.topic(topic)
.create();
for (int i=0; i<10; i++) {
p.newMessage().key("aaa").value(("hello " + i).getBytes()).send();
log.info("send " + i);
Thread.sleep(1000);
}
p.flush();
p.close();
System.out.println("send done");
}
void testReader(PulsarClient client) throws Exception {
Reader reader = client.newReader()
.subscriptionName("reader-test")
.topic(topic)
.startMessageId(MessageId.earliest)
// .startMessageId(DefaultImplementation.newMessageId(121493, -1, -1))
.create();
while (reader.hasMessageAvailable()) {
Message msg = reader.readNext();
log.info("reader recv msg, id=" + msg.getMessageId() + " key=" + msg.getKey() + ", value=" + new String(msg.getData()));
}
reader.close();
}
}
Author: iMine
Link: https://imine141.github.io/2021/07/29/pulsar/pulsar%E8%AE%A4%E8%AF%81%E5%92%8C%E6%8E%88%E6%9D%83(1)/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.