Encrypt Kafka Records

Author: MacroNova

November 2, 2018

In the age of GDPR, data protection becomes even more important aspect of systems persisting sensitive information. Various financial institutions publically confirm adoption of Apache Kafka. Hereby blog post describes application-agnostic solution that guarantees data confidentiality, authentication and integrity. We will see how to encrypt and sign records published to Kafka topics without modifying application code.

Different components of distributed system could be responsible for data encryption. Kafka supports TLS connectivity between client application and brokers, which guarantees secured link and allows to run encryption algorithm on the server side. Introducing new CPU-intensive routine to the broker, could result in performance degradation. Moreover, computing power of all client applications is much greater than capacity of Kafka cluster itself. Assuming we decided to encrypt data on the client side, how to accomplish the task without updating application code?

While initializing Kafka producer and consumer, developers need to specify concrete type of key and value serializers. Open source project kafka-serde-ext introduces chained serializer and deserializer, responsible for execution of configurable number of serializers passing output (usually byte array) of one to another. Below snippet shows how to configure chained and encrypting serializers. Value of Kafka record is encrypted using symmetric AES algorithm with shared passphrase between producer and consumer.

# Producer
key.serializer = org.apache.kafka.common.serialization.LongSerializer
value.serializer = io.macronova.kafka.common.serialization.ChainedSerializer
0.serializer = org.apache.kafka.common.serialization.StringSerializer
1.serializer = io.macronova.kafka.common.serialization.EncryptSerializer
1.transformation = AES/ECB/PKCS5Padding
1.secret = 770A8A65DA156D24EE2A093277530142

# Consumer
key.deserializer = org.apache.kafka.common.serialization.LongDeserializer
value.deserializer = io.macronova.kafka.common.serialization.ChainedDeserializer
0.deserializer = io.macronova.kafka.common.serialization.DecryptDeserializer
0.transformation = AES/ECB/PKCS5Padding
0.secret = 770A8A65DA156D24EE2A093277530142
1.deserializer = org.apache.kafka.common.serialization.StringDeserializer

Listing 1: Kafka producer and consumer configuration to achieve symmetric encryption.

Chained serializers execution.
Figure 1: Chained execution of serializers and deserializers.

In the above example, chained serializer first transforms Java string to byte array and then executes encryption algorithm. Deserializer performs reversed set of operations. Please note that chained serializer and deserializer allow to pass configuration parameters to individual SerDe.

EncryptSerializer from kafka-serde-ext project supports symmetric and asymmetric cryptography algorithms. Asymmetric encryption cannot encode data which size exceeds key length (e.g. around 256 bytes in case of 2048 bit RSA key). To overcome described limitation, users may leverage hybrid encryption schema implemented by HybridEncryptionSerializer. Hybrid encryption first generates random secret passphrase and encodes it using asymmetric cryptography. Hereby passphrase is further used as an input of symmetric algorithm to secure data of any size. Below diagram represents data encrypted with hybrid schema. Public-key cryptosystems are convenient, because they do not require sender and receiver to share a common secret.

Hybrid encryption data representation.
Figure 2: Representation of data encrypted with hybrid approach.

# Producer
key.serializer = org.apache.kafka.common.serialization.LongSerializer
value.serializer = io.macronova.kafka.common.serialization.ChainedSerializer
0.serializer = org.apache.kafka.common.serialization.StringSerializer
1.serializer = io.macronova.kafka.common.serialization.HybridEncryptSerializer
1.symmetric.transformation = AES/ECB/PKCS5Padding
1.asymmetric.transformation = RSA/None/PKCS1Padding
1.asymmetric.key.store.path = /cert/keystore.jks
1.asymmetric.key.store.password = changeit
1.asymmetric.key.store.alias = my-key

# Consumer
key.deserializer = org.apache.kafka.common.serialization.LongDeserializer
value.deserializer = io.macronova.kafka.common.serialization.ChainedDeserializer
0.deserializer = io.macronova.kafka.common.serialization.HybridDecryptDeserializer
0.symmetric.transformation = AES/ECB/PKCS5Padding
0.asymmetric.transformation = RSA/None/PKCS1Padding
0.asymmetric.key.store.path = /cert/keystore.jks
0.asymmetric.key.store.password = changeit
0.asymmetric.key.store.alias = my-key
0.asymmetric.key.store.alias.password = changeit
1.deserializer = org.apache.kafka.common.serialization.StringDeserializer

Listing 2: Kafka producer and consumer configuration to achieve hybrid encryption.

To minimize performance impact of data encryption, try to first compress each record, for example by using Apache Avro message format (with Confluent Schema Registry) instead of sending XML or JSON text. Batch level compression applied by Kafka producer (compression.type property) does not seem to work efficiently on already encrypted content.

Previous sections of the blog post describe how to encrypt sensitive Kafka records. Now let us see how to guarantee authentication and integrity of data by adding digital signature. kafka-serde-ext project provides easy to use GenerateSignatureSerializer and VerifySignatureDeserializer. Serializer prepends digital signature to output byte array. Deserializer verifies signature and raises SerializationException when it turns invalid (wrapped inside KafkaException by KafkaConsumer#poll() method). Below listing demonstrates usage of digital signature together with symmetric encryption.

# Producer
key.serializer = org.apache.kafka.common.serialization.LongSerializer
value.serializer = io.macronova.kafka.common.serialization.ChainedSerializer
0.serializer = org.apache.kafka.common.serialization.StringSerializer
1.serializer = io.macronova.kafka.common.serialization.EncryptSerializer
1.transformation = AES/ECB/PKCS5Padding
1.secret = 770A8A65DA156D24EE2A093277530142
2.serializer = io.macronova.kafka.common.serialization.GenerateSignatureSerializer
2.algorithm = SHA256withRSA
2.key.store.path = /cert/keystore.jks
2.key.store.password = changeit
2.key.store.alias = my-key
2.key.store.alias.password = changeit

# Consumer
key.deserializer = org.apache.kafka.common.serialization.LongDeserializer
value.deserializer = io.macronova.kafka.common.serialization.ChainedDeserializer
0.deserializer = io.macronova.kafka.common.serialization.VerifySignatureDeserializer
0.algorithm = SHA256withRSA
0.key.store.path = /cert/keystore.jks
0.key.store.password = changeit
0.key.store.alias = my-key
1.deserializer = io.macronova.kafka.common.serialization.DecryptDeserializer
1.transformation = AES/ECB/PKCS5Padding
1.secret = 770A8A65DA156D24EE2A093277530142
2.deserializer = org.apache.kafka.common.serialization.StringDeserializer

Listing 3: Configuration of Kafka producer and consumer that encrypts and signs record value.

Apache Kafka serialization interface allows to nicely decouple technical code from business logic. Feel free to use and contribute new ideas to kafka-serde-ext project! For detailed documentation of all mentioned serializers and working samples, consult documentation available on GitHub. Happy coding!