mLC
mLC

Reputation: 693

How to encrypt a column in Pandas/Spark dataframe using AWS KMS

I want to encrypt values in one column of my Pandas (or PySpark) dataframe, e.g. to take the the column mobno in the following dataframe, encrypt it and put the result in the encrypted_value column:

encrypted-dataframe

I want to use AWS KMS encryption key. My question is: what is the most elegant way how to achieve this?

I am thinking about using UDF, which will call the boto3's KMS client. Something like:

@udf
def encrypt(plaintext):
  response = kms_client.encrypt(
    KeyId=aws_kms_key_id,
    Plaintext=plaintext
  )
  ciphertext = response['CiphertextBlob']
  return ciphertext

and then applying this udf on the whole column.

But I am not quite confident this is the right way. This stems from the fact that I am an encryption-rookie - first, I don't even know this kms_client_encrypt function is meant for encrypting values (from the columns) or it is meant for manipulate the keys. Maybe the better way is to obtain the key and then use some python encryption library (such as hashlib).

I would like to have some clarification on the encryption process and also recommendation what the best approach to column encryption is.

Upvotes: 6

Views: 9461

Answers (2)

ZygD
ZygD

Reputation: 24478

Since Spark 3.3 you can do AES encryption (and decryption) without UDF.

aes_encrypt(expr, key[, mode[, padding]]) - Returns an encrypted value of expr using AES in given mode with the specified padding. Key lengths of 16, 24 and 32 bits are supported. Supported combinations of (mode, padding) are ('ECB', 'PKCS') and ('GCM', 'NONE'). The default mode is GCM.

Arguments:
expr - The binary value to encrypt.
key - The passphrase to use to encrypt the data.
mode - Specifies which block cipher mode should be used to encrypt messages. Valid modes: ECB, GCM.
padding - Specifies how to pad messages whose length is not a multiple of the block size. Valid values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB and NONE for GCM.

from pyspark.sql import functions as F
df = spark.createDataFrame([('8223344556',)], ['mobno'])

df = df.withColumn('encrypted_value', F.expr("aes_encrypt(mobno, 'your_secret_keyy')"))

df.show()
#  +----------+--------------------+
#  |     mobno|     encrypted_value|
#  +----------+--------------------+
#  |8223344556|[9B 33 DB 9B 5D C...|
#  +----------+--------------------+

df.printSchema()
#  root
#   |-- mobno: string (nullable = true)
#   |-- encrypted_value: binary (nullable = true)

Upvotes: 1

Chris
Chris

Reputation: 1459

To avoid many calls to the KMS service in a UDF, use AWS Secrets Manager instead to retrieve your encryption key and pycrypto to encrypt the column. The following works:

from pyspark.sql.functions import udf, col
from Crypto.Cipher import AES

region_name = "eu-west-1"
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
secret_key = json.loads(get_secret_value_response['SecretString'])
clear_text_column = 'mobo'

def encrypt(key, text):
    obj = AES.new(key, AES.MODE_CFB, 'This is an IV456')
    return obj.encrypt(text)

def udf_encrypt(key):
    return udf(lambda text: encrypt(key, text))

df.withColumn("encrypted", udf_encrypt(secret_key)(col(clear_text_column))).show()

Or alternatively, using more efficient Pandas UDF as suggested by @Vektor88 (PySpark 3 syntax):

from functools import partial

encrypt_with_key = partial(encrypt, secret_key)

@pandas_udf(BinaryType())
def pandas_udf_encrypt(clear_strings: pd.Series) -> pd.Series:
    return clear_strings.apply(encrypt_with_key)

df.withColumn('encrypted', pandas_udf_encrypt(clear_text_column)).show()

Upvotes: 0

Related Questions