pular支持对消息的传输过程进行加密,加密方式有两种:一种是直接使用TLS,一种是基于加密算法进行数据加密。
本文主要是记录下基于加密算法进行加密的方法。
1. 加解密的流程 pulsar使用AES对数据进行加密,AES的密钥和消息一起转发,并且AES密钥使用ECDSA/RSA进行加密。
如果使用pulsar的加密功能,需要先生成一对密钥:公钥、私钥。生产者使用公钥对AES密钥进行加密,消费者使用私钥对AES密钥进行解密。
由于pulsar本身是不存储密钥的,所以如果生产者和消费者丢失了密钥,则数据就永远不能解密了。
下面分别是生产者加密流程和消费者解密流程:
2. 具体实现方式 2.1 使用openssl生成密钥对 openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem openssl ec -in test_ecdsa_privkey.pem -pubout -outform pkcs8 -out test_ecdsa_pubkey.pem
2.2 实现CryptoKeyReader接口,用于pulsar获取密钥
package encrypt;import org.apache.pulsar.client.api.CryptoKeyReader;import org.apache.pulsar.client.api.EncryptionKeyInfo;import java.io.File;import java.nio.file.Files;import java.util.Map;public class RawFileKeyReader implements CryptoKeyReader { String publicKeyFile = "F:\\download\\test_ecdsa_pubkey.pem" ; String privateKeyFile = "F:\\download\\test_ecdsa_privkey.pem" ; @Override public EncryptionKeyInfo getPublicKey (String keyName, Map<String, String> metadata) { EncryptionKeyInfo info = new EncryptionKeyInfo (); try { System.out.println("getPublicKey" ); info.setKey(Files.readAllBytes(new File (publicKeyFile).toPath())); } catch (Exception e) { e.printStackTrace(); } return info; } @Override public EncryptionKeyInfo getPrivateKey (String keyName, Map<String, String> metadata) { EncryptionKeyInfo info = new EncryptionKeyInfo (); System.out.println("getPrivateKey" ); try { info.setKey(Files.readAllBytes(new File (privateKeyFile).toPath())); } catch (Exception e) { e.printStackTrace(); } return info; } }
2.3 编写生产者和消费者进行测试 package encrypt;import auth.client.VVAuthentication;import org.apache.pulsar.client.api.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;public class EncryptTest { private static final Logger log = LoggerFactory.getLogger(EncryptTest.class); public static void main (String[] args) throws Exception { EncryptTest main = new EncryptTest (); main.run(); } String topic = "persistent://tenant_c/ns1/topic1" ; void run () throws Exception { PulsarClient client = PulsarClient.builder() .authentication(new VVAuthentication ()) .serviceUrl("pulsar://172.20.140.11:6650" ) .build(); produce(client); consume(client); client.close(); } void produce (PulsarClient client) throws Exception { Producer p = client.newProducer() .topic(topic) .producerName("pro_encrypt" ) .addEncryptionKey("vv_01" ) .cryptoKeyReader(new RawFileKeyReader ()) .create(); MessageId id = p.newMessage(Schema.STRING).key("key-1" ).value("hello world" ).send(); p.flush(); log.info("send " + id); p.close(); } void consume (PulsarClient client) throws Exception { Consumer c = client.newConsumer() .subscriptionName("con_encrypt" ) .topic(topic) .cryptoKeyReader(new RawFileKeyReader ()) .subscribe(); while (true ) { Message msg = c.receive(2 , TimeUnit.SECONDS); if (msg == null ) { break ; } c.acknowledge(msg); log.info("receive " + msg.getKey() + ", " + new String (msg.getData())); } c.close(); } }
输出结果如下:
getPublicKey 12:29:21.462 [main] INFO encrypt.EncryptTest - send 56925:3:-1 getPrivateKey 12:29:21.535 [main] INFO encrypt.EncryptTest - receive key-1, hello world
可以看到在生产者端使用公钥对AES密钥加密,在消费者端使用私钥对AES密钥进行解密。
生产者设置数据加密密钥:
/** * Add public encryption key, used by producer to encrypt the data key. * * <p>At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are * found, a callback {@link CryptoKeyReader#getPrivateKey(String, Map)} and * {@link CryptoKeyReader#getPublicKey(String, Map)} is invoked against each key to load the values of the key. * Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message * is encrypted after compression. If batch messaging is enabled, the batched message is encrypted. * * @param key * the name of the encryption key in the key store * @return the producer builder instance */ ProducerBuilder<T> addEncryptionKey(String key);
当生产者创建时,pulsar客户端检查是否添加了密钥,如果找到了就回调CryptoKeyReader的方法获取公钥/私钥。应用需要实现CryptoKeyReader的方法,并返回pkcs8格式的密钥。如果消息开启了压缩,则消息会在压缩后被加密。如果消息是批量发送的,则消息被打包后会被加密。