geekygeek
geekygeek

Reputation: 751

How to encrypt a file that is in memory?

I have files that I have read into memory using the following.

with open("myfile.txt", 'r') as FID: 
    data = FID.read()
file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))

I would like to encrypt the contents of this file using 7zip and AES256. I would like it to be equivalent to when using 7zip on desktop.

For this purpose, I found py7zr.

The example that is provided for this library is as follows.

import py7zr


with py7zr.SevenZipFile('target.7z', 'w', password='secret') as archive:
    archive.writeall('/path/to/base_dir', 'base')

I imagine that I could use io.StringIO. The following could be the "file" I write to.

encrypted_data = io.StringIO()

How do I go from here?

I intend to upload this as a 7zip file to Google Drive. Since the file is already in memory, I would just want to encrypt it in memory and pass it along to the below Google Drive function. As can be seen, I pass in my file as bytes in memory to upload it to Google Drive.

def GDrive_UploadFile_FromMemory(self, fileInstance, filename, parentIDs = None):
    try: 
        filename_pathlib = pathlib.PurePath(filename)
        filename_no_extension = filename_pathlib.stem
        mimeType = self.GDrive_ExtensionToMime(filename)
        if type(fileInstance) == str: 
            fileInstance = fileInstance.encode()
        media = MediaIoBaseUpload(io.BytesIO(fileInstance), mimetype=mimeType, resumable=True)
        body = {'name' : filename_no_extension}
        if type(parentIDs) is str: 
            parentIDs = [parentIDs]
        elif parentIDs is not None and type(parentIDs) != list(): 
            self.GDLogger.error(f"ParentIDs [{parentIDs}] was not valid. Trying to upload {filename}")
        if parentIDs is not None: 
            body["parents"] = parentIDs
            
        request = self.GDrive_Service.files().create(media_body=media, body=body)
        response = None
        while response is None: 
            status, response = request.next_chunk()
            if status: 
                self.GDLogger.info(f"Upload Status for file [{filename}] : {status.progress()*100}")
        
        return response.get('id') 
    except: 
        self.GDLogger.error("An error has occurred in GDrive_UploadFile")
        tb = traceback.format_exc()
        self.GDLogger.exception(tb)
        return False 

Upvotes: 2

Views: 590

Answers (1)

Joshua
Joshua

Reputation: 561

Encrypt it and the delete the un-encrypted variable

Now you memory does not have the un-encrypted variable and is safe.

P.S.: note the inline comment. You should save that parameter to be able to decrypt later.

with open("myfile.txt", 'r') as FID: 
    data = FID.read()
unencrypted_variable = base64.urlsafe_b64decode(data.encode('UTF-8'))
encrypted_variable = aes_gcm_encrypt(unencrypted_variable,secrets.token_bytes(32)) #save the 2nd parameter to file later for decrypt
del unencrypted_variable

Code for the AES256-GCM function (You said you wanted AES256):

import binascii, time
from base64 import urlsafe_b64encode as b64e, urlsafe_b64decode as b64d

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidTag

backend = default_backend()

def aes_gcm_encrypt(message: bytes, key: bytes) -> bytes:
    current_time = int(time.time()).to_bytes(8, 'big')
    algorithm = algorithms.AES(key)
    iv = secrets.token_bytes(algorithm.block_size // 8)
    cipher = Cipher(algorithm, modes.GCM(iv), backend=backend)
    encryptor = cipher.encryptor()
    encryptor.authenticate_additional_data(current_time)
    ciphertext = encryptor.update(message) + encryptor.finalize()        
    return b64e(current_time + iv + ciphertext + encryptor.tag)

def aes_gcm_decrypt(token: bytes, key: bytes, ttl=None) -> bytes:
    algorithm = algorithms.AES(key)
    try:
        data = b64d(token)
    except (TypeError, binascii.Error):
        raise InvalidToken
    timestamp, iv, tag = data[:8], data[8:algorithm.block_size // 8 + 8], data[-16:]
    if ttl is not None:
        current_time = int(time.time())
        time_encrypted, = int.from_bytes(data[:8], 'big')
        if time_encrypted + ttl < current_time or current_time + 60 < time_encrypted:
            raise InvalidToken
    cipher = Cipher(algorithm, modes.GCM(iv, tag), backend=backend)
    decryptor = cipher.decryptor()
    decryptor.authenticate_additional_data(timestamp)
    ciphertext = data[8 + len(iv):-16]
    return decryptor.update(ciphertext) + decryptor.finalize()

Upvotes: 1

Related Questions