Reputation: 751
I have files that I have read into memory using the following.
with open("myfile.txt", 'r') as FID:
data = FID.read()
file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))
I would like to encrypt the contents of this file using 7zip and AES256. I would like it to be equivalent to when using 7zip on desktop.
For this purpose, I found py7zr
.
The example that is provided for this library is as follows.
import py7zr
with py7zr.SevenZipFile('target.7z', 'w', password='secret') as archive:
archive.writeall('/path/to/base_dir', 'base')
I imagine that I could use io.StringIO
. The following could be the "file" I write to.
encrypted_data = io.StringIO()
How do I go from here?
I intend to upload this as a 7zip file to Google Drive. Since the file is already in memory, I would just want to encrypt it in memory and pass it along to the below Google Drive function. As can be seen, I pass in my file as bytes in memory to upload it to Google Drive.
def GDrive_UploadFile_FromMemory(self, fileInstance, filename, parentIDs = None):
try:
filename_pathlib = pathlib.PurePath(filename)
filename_no_extension = filename_pathlib.stem
mimeType = self.GDrive_ExtensionToMime(filename)
if type(fileInstance) == str:
fileInstance = fileInstance.encode()
media = MediaIoBaseUpload(io.BytesIO(fileInstance), mimetype=mimeType, resumable=True)
body = {'name' : filename_no_extension}
if type(parentIDs) is str:
parentIDs = [parentIDs]
elif parentIDs is not None and type(parentIDs) != list():
self.GDLogger.error(f"ParentIDs [{parentIDs}] was not valid. Trying to upload {filename}")
if parentIDs is not None:
body["parents"] = parentIDs
request = self.GDrive_Service.files().create(media_body=media, body=body)
response = None
while response is None:
status, response = request.next_chunk()
if status:
self.GDLogger.info(f"Upload Status for file [{filename}] : {status.progress()*100}")
return response.get('id')
except:
self.GDLogger.error("An error has occurred in GDrive_UploadFile")
tb = traceback.format_exc()
self.GDLogger.exception(tb)
return False
Upvotes: 2
Views: 590
Reputation: 561
Encrypt it and the delete the un-encrypted variable
Now you memory does not have the un-encrypted variable and is safe.
P.S.: note the inline comment. You should save that parameter to be able to decrypt later.
with open("myfile.txt", 'r') as FID:
data = FID.read()
unencrypted_variable = base64.urlsafe_b64decode(data.encode('UTF-8'))
encrypted_variable = aes_gcm_encrypt(unencrypted_variable,secrets.token_bytes(32)) #save the 2nd parameter to file later for decrypt
del unencrypted_variable
Code for the AES256-GCM function (You said you wanted AES256):
import binascii, time
from base64 import urlsafe_b64encode as b64e, urlsafe_b64decode as b64d
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidTag
backend = default_backend()
def aes_gcm_encrypt(message: bytes, key: bytes) -> bytes:
current_time = int(time.time()).to_bytes(8, 'big')
algorithm = algorithms.AES(key)
iv = secrets.token_bytes(algorithm.block_size // 8)
cipher = Cipher(algorithm, modes.GCM(iv), backend=backend)
encryptor = cipher.encryptor()
encryptor.authenticate_additional_data(current_time)
ciphertext = encryptor.update(message) + encryptor.finalize()
return b64e(current_time + iv + ciphertext + encryptor.tag)
def aes_gcm_decrypt(token: bytes, key: bytes, ttl=None) -> bytes:
algorithm = algorithms.AES(key)
try:
data = b64d(token)
except (TypeError, binascii.Error):
raise InvalidToken
timestamp, iv, tag = data[:8], data[8:algorithm.block_size // 8 + 8], data[-16:]
if ttl is not None:
current_time = int(time.time())
time_encrypted, = int.from_bytes(data[:8], 'big')
if time_encrypted + ttl < current_time or current_time + 60 < time_encrypted:
raise InvalidToken
cipher = Cipher(algorithm, modes.GCM(iv, tag), backend=backend)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(timestamp)
ciphertext = data[8 + len(iv):-16]
return decryptor.update(ciphertext) + decryptor.finalize()
Upvotes: 1