Reputation: 693
I want to encrypt values in one column of my Pandas (or PySpark) dataframe, e.g. to take the the column mobno
in the following dataframe, encrypt it and put the result in the encrypted_value
column:
I want to use AWS KMS encryption key. My question is: what is the most elegant way how to achieve this?
I am thinking about using UDF, which will call the boto3's KMS client. Something like:
@udf
def encrypt(plaintext):
response = kms_client.encrypt(
KeyId=aws_kms_key_id,
Plaintext=plaintext
)
ciphertext = response['CiphertextBlob']
return ciphertext
and then applying this udf on the whole column.
But I am not quite confident this is the right way. This stems from the fact that I am an encryption-rookie - first, I don't even know this kms_client_encrypt
function is meant for encrypting values (from the columns) or it is meant for manipulate the keys. Maybe the better way is to obtain the key and then use some python encryption library (such as hashlib
).
I would like to have some clarification on the encryption process and also recommendation what the best approach to column encryption is.
Upvotes: 6
Views: 9461
Reputation: 24478
Since Spark 3.3 you can do AES encryption (and decryption) without UDF.
aes_encrypt
(expr, key[, mode[, padding]])
- Returns an encrypted value ofexpr
using AES in givenmode
with the specifiedpadding
. Key lengths of 16, 24 and 32 bits are supported. Supported combinations of (mode
,padding
) are ('ECB', 'PKCS') and ('GCM', 'NONE'). The default mode is GCM.Arguments:
expr
- The binary value to encrypt.
key
- The passphrase to use to encrypt the data.
mode
- Specifies which block cipher mode should be used to encrypt messages. Valid modes: ECB, GCM.
padding
- Specifies how to pad messages whose length is not a multiple of the block size. Valid values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB and NONE for GCM.
from pyspark.sql import functions as F
df = spark.createDataFrame([('8223344556',)], ['mobno'])
df = df.withColumn('encrypted_value', F.expr("aes_encrypt(mobno, 'your_secret_keyy')"))
df.show()
# +----------+--------------------+
# | mobno| encrypted_value|
# +----------+--------------------+
# |8223344556|[9B 33 DB 9B 5D C...|
# +----------+--------------------+
df.printSchema()
# root
# |-- mobno: string (nullable = true)
# |-- encrypted_value: binary (nullable = true)
Upvotes: 1
Reputation: 1459
To avoid many calls to the KMS service in a UDF, use AWS Secrets Manager instead to retrieve your encryption key and pycrypto
to encrypt the column. The following works:
from pyspark.sql.functions import udf, col
from Crypto.Cipher import AES
region_name = "eu-west-1"
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
secret_key = json.loads(get_secret_value_response['SecretString'])
clear_text_column = 'mobo'
def encrypt(key, text):
obj = AES.new(key, AES.MODE_CFB, 'This is an IV456')
return obj.encrypt(text)
def udf_encrypt(key):
return udf(lambda text: encrypt(key, text))
df.withColumn("encrypted", udf_encrypt(secret_key)(col(clear_text_column))).show()
Or alternatively, using more efficient Pandas UDF as suggested by @Vektor88 (PySpark 3 syntax):
from functools import partial
encrypt_with_key = partial(encrypt, secret_key)
@pandas_udf(BinaryType())
def pandas_udf_encrypt(clear_strings: pd.Series) -> pd.Series:
return clear_strings.apply(encrypt_with_key)
df.withColumn('encrypted', pandas_udf_encrypt(clear_text_column)).show()
Upvotes: 0