Reputation: 5023
Currently we're using an approach of putting CA Certificates on the server to access third party APIs.
certificate_path = os.path.join(CERT_PATH, 'cacert.pem')
certificate_key_path = os.path.join(CERT_PATH, 'cacert.key')
response = requests.get(url, cert=(certificate_path, certificate_key_path))
This works,But we're looking for instead of storing CA certificates on the server, store in the Accounts
Table in the database for security purposes (security cause raised by Customer).
So the questions are:
Is there any approach we can directly pass CA cert's string to the requests
directly (other than writing content in to a temp file)?
Is any other http
python module support passing CA cert's string in the http
get/post request?
Is there any other approach we should use instead of storing them in the database and on the server?
Upvotes: 13
Views: 24450
Reputation: 3088
If you're using Python 3.12, you can (securely) use temp files. Example:
with tempfile.NamedTemporaryFile('w', delete_on_close=False) as cert_file:
cert_file.write(cert_str)
cert_file.close()
with tempfile.NamedTemporaryFile('w', delete_on_close=False) as key_file:
key_file.write(key_str)
key_file.close()
response = requests.get(url, cert=(cert_file.name, key_file.name))
This depends on the new delete_on_close
option on NamedTemporaryFile
.
The security concerns linked to in other answers are all about the temp files not being removed, or not being deleted because an exception is thrown. This code will delete the file when the context manager exits (after request.get()
), because delete=True
is still set (its default value).
Prior to 3.12, file.close()
would trigger deleting the file. (Making it useless for this case!)
Upvotes: 0
Reputation: 648
Since i cannot have additional dependencies as openssl, here is alternative using requests
library and temporary file:
import contextlib
import os
from tempfile import NamedTemporaryFile
import requests
cert_key_str = """
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
"""
cert_str = """
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
"""
url = "https://..."
@contextlib.contextmanager
def create_temporary_file(suffix=None):
"""
Context that introduces a temporary file.
Adapted from:
https://stackoverflow.com/a/57701186/5818549
Creates a temporary file, yields its name, and upon context exit, deletes it.
(In contrast, tempfile.NamedTemporaryFile() provides a 'file' object and
deletes the file as soon as that file object is closed, so the temporary file
cannot be safely re-opened by another library or process.)
Args:
suffix: desired filename extension (e.g. '.mp4').
Yields:
The name of the temporary file.
"""
try:
temporaty_file = NamedTemporaryFile(suffix=suffix, delete=False)
temporaty_filename = temporaty_file.name
temporaty_file.close()
yield temporaty_filename
finally:
os.unlink(temporaty_filename)
with (
create_temporary_file() as cert_filename,
create_temporary_file() as key_filename
):
with (
open(cert_filename, "w") as cert_file,
open(key_filename, "w") as key_file
):
cert_file.write(cert_str)
key_file.write(cert_key_str)
final_conn = requests.get(
url,
cert=(cert_filename, key_filename),
)
print(final_conn.text)
Upvotes: 1
Reputation: 171
I took a different approach and used the init_poolmanager to set the ssl context. I avoid patching, so it will only work with Session objects.
E.x.:
#pip install requests pyOpenSSL
import OpenSSL
import requests
import requests.hooks
from urllib3 import Retry
from urllib3.contrib.pyopenssl import PyOpenSSLContext
from urllib3.util.ssl_ import create_urllib3_context
class ClientSideCertificateHTTPAdapter(requests.adapters.HTTPAdapter):
DEFAULT_PROTOCOL = create_urllib3_context().protocol
def __init__(self, *args, cert, key, protocol=DEFAULT_PROTOCOL, **kwargs):
self._cert = cert
self._key = key
self._protocol = protocol
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
ctx = PyOpenSSLContext(self._protocol)
kwargs["ssl_context"] = ctx
ctx._ctx.use_certificate(self._cert)
ctx._ctx.use_privatekey(self._key)
return super().init_poolmanager(*args, **kwargs)
def main():
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----")
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----", b"passphrase_goes_here")
adapter = ClientSideCertificateHTTPAdapter(cert=cert, key=key, max_retries=Retry(total=10, backoff_factor=0.5))
session = requests.Session()
session.mount("https://www.hotmail.com/", adapter)
session.get("https://www.hotmail.com/api/v2/mail")
if __name__ == "__main__":
main()
Upvotes: 4
Reputation: 838
The example you have provided is passing a client-side cert as shown in the requests documentation.
As it stands there is no way to pass the client cert and key in memory (or as a string).
Monkey patching to the rescue - by monkey patching requests
you can add the ability to load client certs and keys from memory. The following patch enables passing in a client cert and key in a variety formats without breaking the existing functionality.
import requests
from OpenSSL.crypto import PKCS12, X509, PKey
def _is_key_file_encrypted(keyfile):
'''In memory key is not encrypted'''
if isinstance(keyfile, PKey):
return False
return _is_key_file_encrypted.original(keyfile)
class PyOpenSSLContext(requests.packages.urllib3.contrib.pyopenssl.PyOpenSSLContext):
'''Support loading certs from memory'''
def load_cert_chain(self, certfile, keyfile=None, password=None):
if isinstance(certfile, X509) and isinstance(keyfile, PKey):
self._ctx.use_certificate(certfile)
self._ctx.use_privatekey(keyfile)
else:
super().load_cert_chain(certfile, keyfile=keyfile, password=password)
class HTTPAdapter(requests.adapters.HTTPAdapter):
'''Handle a variety of cert types'''
def cert_verify(self, conn, url, verify, cert):
if cert:
# PKCS12
if isinstance(cert, PKCS12):
conn.cert_file = cert.get_certificate()
conn.key_file = cert.get_privatekey()
cert = None
elif isinstance(cert, tuple) and len(cert) == 2:
# X509 and PKey
if isinstance(cert[0], X509) and hasattr(cert[1], PKey):
conn.cert_file = cert[0]
conn.key_file = cert[1]
cert = None
# cryptography objects
elif hasattr(cert[0], 'public_bytes') and hasattr(cert[1], 'private_bytes'):
conn.cert_file = X509.from_cryptography(cert[0])
conn.key_file = PKey.from_cryptography_key(cert[1])
cert = None
super().cert_verify(conn, url, verify, cert)
def patch_requests(adapter=True):
'''You can perform a full patch and use requests as usual:
>>> patch_requests()
>>> requests.get('https://httpbin.org/get')
or use the adapter explicitly:
>>> patch_requests(adapter=False)
>>> session = requests.Session()
>>> session.mount('https', HTTPAdapter())
>>> session.get('https://httpbin.org/get')
'''
if hasattr(requests.packages.urllib3.util.ssl_, '_is_key_file_encrypted'):
_is_key_file_encrypted.original = requests.packages.urllib3.util.ssl_._is_key_file_encrypted
requests.packages.urllib3.util.ssl_._is_key_file_encrypted = _is_key_file_encrypted
requests.packages.urllib3.util.ssl_.SSLContext = PyOpenSSLContext
if adapter:
requests.sessions.HTTPAdapter = HTTPAdapter
To use the patch you can do something like the following (assume the above code is in a file called patch.py
)
import os
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from patch import patch_requests
CLIENT_CERT = serialization.load_pem_x509_certificate(
os.getenv('CLIENT_CERT'), default_backend())
CLIENT_KEY = serialization.load_pem_private_key(
os.getenv('CLIENT_KEY'), None, default_backend())
# monkey patch load_cert_chain to allow loading
# cryptography certs and keys from memory
patch_requests()
response = requests.get(url, cert=(CLIENT_CERT, CLIENT_KEY))
You now have the ability to supply a client cert to requests in memory in as pyopenssl
object(s) or cryptography
objects.
Upvotes: 9
Reputation: 96
If one wants to do this without using temporary file, it is possible by overriding the requests SSLContext. Sample can be seen in this answer.
Upvotes: 4
Reputation: 473
There is a way to do it via temp files, like this:
cert = tempfile.NamedTemporaryFile(delete=False)
cert.write(CERTIFICATE_AS_STRING)
cert.close()
requests.get(url, cert=cert.name, verify=True)
os.unlink(cert.name)
If you'd like to know why this is potentially unsecure, check out my answer here: https://stackoverflow.com/a/46570264/6445270
Upvotes: 7