pulsar对消息进行加密

pular支持对消息的传输过程进行加密,加密方式有两种:一种是直接使用TLS,一种是基于加密算法进行数据加密。

本文主要是记录下基于加密算法进行加密的方法。

1. 加解密的流程

pulsar使用AES对数据进行加密,AES的密钥和消息一起转发,并且AES密钥使用ECDSA/RSA进行加密。

如果使用pulsar的加密功能,需要先生成一对密钥:公钥、私钥。生产者使用公钥对AES密钥进行加密,消费者使用私钥对AES密钥进行解密。

由于pulsar本身是不存储密钥的,所以如果生产者和消费者丢失了密钥,则数据就永远不能解密了。

下面分别是生产者加密流程和消费者解密流程:

图片

图片

2. 具体实现方式

2.1 使用openssl生成密钥对

openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
openssl ec -in test_ecdsa_privkey.pem -pubout -outform pkcs8 -out test_ecdsa_pubkey.pem

2.2 实现CryptoKeyReader接口,用于pulsar获取密钥

package encrypt;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import java.io.File;
import java.nio.file.Files;
import java.util.Map;
/**
* @function
* @date 2021/7/27 9:45
*/
public class RawFileKeyReader implements CryptoKeyReader {
String publicKeyFile = "F:\\download\\test_ecdsa_pubkey.pem";
String privateKeyFile = "F:\\download\\test_ecdsa_privkey.pem";
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata) {
EncryptionKeyInfo info = new EncryptionKeyInfo();
try {
System.out.println("getPublicKey");
info.setKey(Files.readAllBytes(new File(publicKeyFile).toPath()));
} catch (Exception e) {
e.printStackTrace();
}
return info;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
EncryptionKeyInfo info = new EncryptionKeyInfo();
System.out.println("getPrivateKey");
try {
info.setKey(Files.readAllBytes(new File(privateKeyFile).toPath()));
} catch (Exception e) {
e.printStackTrace();
}
return info;
}
}

2.3 编写生产者和消费者进行测试

package encrypt;
import auth.client.VVAuthentication;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**

* @function
* @date 2021/7/27 8:47
*/
public class EncryptTest {
private static final Logger log = LoggerFactory.getLogger(EncryptTest.class);
public static void main(String[] args) throws Exception {
EncryptTest main = new EncryptTest();
main.run();
}
String topic = "persistent://tenant_c/ns1/topic1";
void run() throws Exception {
PulsarClient client = PulsarClient.builder()
.authentication(new VVAuthentication())
.serviceUrl("pulsar://172.20.140.11:6650")
.build();
produce(client);
consume(client);
client.close();
}
void produce(PulsarClient client) throws Exception {
Producer p = client.newProducer()
.topic(topic)
.producerName("pro_encrypt")
// 设置AES密钥,如果不设置则不会对消息加密
.addEncryptionKey("vv_01")
// 设置从哪获取公钥/私钥
.cryptoKeyReader(new RawFileKeyReader())
.create();
MessageId id = p.newMessage(Schema.STRING).key("key-1").value("hello world").send();
p.flush();
log.info("send " + id);
p.close();
}
void consume(PulsarClient client) throws Exception {
Consumer c = client.newConsumer()
.subscriptionName("con_encrypt")
.topic(topic)
.cryptoKeyReader(new RawFileKeyReader())
.subscribe();
while (true) {
Message msg = c.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
c.acknowledge(msg);
log.info("receive " + msg.getKey() + ", " + new String(msg.getData()));
}
c.close();
}
}

输出结果如下:

getPublicKey
12:29:21.462 [main] INFO encrypt.EncryptTest - send 56925:3:-1
getPrivateKey
12:29:21.535 [main] INFO encrypt.EncryptTest - receive key-1, hello world

可以看到在生产者端使用公钥对AES密钥加密,在消费者端使用私钥对AES密钥进行解密。

生产者设置数据加密密钥:

/**
* Add public encryption key, used by producer to encrypt the data key.
*
* <p>At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are
* found, a callback {@link CryptoKeyReader#getPrivateKey(String, Map)} and
* {@link CryptoKeyReader#getPublicKey(String, Map)} is invoked against each key to load the values of the key.
* Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message
* is encrypted after compression. If batch messaging is enabled, the batched message is encrypted.
*
* @param key
* the name of the encryption key in the key store
* @return the producer builder instance
*/
ProducerBuilder<T> addEncryptionKey(String key);

当生产者创建时,pulsar客户端检查是否添加了密钥,如果找到了就回调CryptoKeyReader的方法获取公钥/私钥。应用需要实现CryptoKeyReader的方法,并返回pkcs8格式的密钥。如果消息开启了压缩,则消息会在压缩后被加密。如果消息是批量发送的,则消息被打包后会被加密。

Author: iMine
Link: https://imine141.github.io/2021/08/06/pulsar/pulsar%E5%AF%B9%E6%B6%88%E6%81%AF%E8%BF%9B%E8%A1%8C%E5%8A%A0%E5%AF%86/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.