Laxmikant Ratnaparkhi
Laxmikant Ratnaparkhi

Reputation: 5023

Python requests CA certificates as a string

Currently we're using an approach of putting CA Certificates on the server to access third party APIs.

certificate_path = os.path.join(CERT_PATH, 'cacert.pem')
certificate_key_path = os.path.join(CERT_PATH, 'cacert.key')
response = requests.get(url, cert=(certificate_path, certificate_key_path))

This works,But we're looking for instead of storing CA certificates on the server, store in the Accounts Table in the database for security purposes (security cause raised by Customer).

So the questions are:

Upvotes: 13

Views: 24450

Answers (6)

keredson
keredson

Reputation: 3088

If you're using Python 3.12, you can (securely) use temp files. Example:

with tempfile.NamedTemporaryFile('w', delete_on_close=False) as cert_file:
  cert_file.write(cert_str)
  cert_file.close()
  with tempfile.NamedTemporaryFile('w', delete_on_close=False) as key_file:
    key_file.write(key_str)
    key_file.close()
    response = requests.get(url, cert=(cert_file.name, key_file.name))

This depends on the new delete_on_close option on NamedTemporaryFile.

The security concerns linked to in other answers are all about the temp files not being removed, or not being deleted because an exception is thrown. This code will delete the file when the context manager exits (after request.get()), because delete=True is still set (its default value).

Prior to 3.12, file.close() would trigger deleting the file. (Making it useless for this case!)

Upvotes: 0

Ilja Leiko
Ilja Leiko

Reputation: 648

Since i cannot have additional dependencies as openssl, here is alternative using requests library and temporary file:

import contextlib
import os
from tempfile import NamedTemporaryFile

import requests

cert_key_str = """
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
"""

cert_str = """
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
"""

url = "https://..."


@contextlib.contextmanager
def create_temporary_file(suffix=None):
    """
    Context that introduces a temporary file.
    Adapted from:
    https://stackoverflow.com/a/57701186/5818549

    Creates a temporary file, yields its name, and upon context exit, deletes it.
    (In contrast, tempfile.NamedTemporaryFile() provides a 'file' object and
    deletes the file as soon as that file object is closed, so the temporary file
    cannot be safely re-opened by another library or process.)

    Args:
      suffix: desired filename extension (e.g. '.mp4').

    Yields:
      The name of the temporary file.
    """
    try:
        temporaty_file = NamedTemporaryFile(suffix=suffix, delete=False)
        temporaty_filename = temporaty_file.name
        temporaty_file.close()
        yield temporaty_filename
    finally:
        os.unlink(temporaty_filename)


with (
    create_temporary_file() as cert_filename,
    create_temporary_file() as key_filename
):
    with (
        open(cert_filename, "w") as cert_file,
        open(key_filename, "w") as key_file
    ):
        cert_file.write(cert_str)
        key_file.write(cert_key_str)

    final_conn = requests.get(
        url,
        cert=(cert_filename, key_filename),
    )
    print(final_conn.text)

Upvotes: 1

hb2638
hb2638

Reputation: 171

I took a different approach and used the init_poolmanager to set the ssl context. I avoid patching, so it will only work with Session objects.

E.x.:

#pip install requests pyOpenSSL

import OpenSSL
import requests
import requests.hooks
from urllib3 import Retry
from urllib3.contrib.pyopenssl import PyOpenSSLContext
from urllib3.util.ssl_ import create_urllib3_context


class ClientSideCertificateHTTPAdapter(requests.adapters.HTTPAdapter):
    DEFAULT_PROTOCOL = create_urllib3_context().protocol

    def __init__(self, *args, cert, key, protocol=DEFAULT_PROTOCOL, **kwargs):
        self._cert = cert
        self._key = key
        self._protocol = protocol
        super().__init__(*args, **kwargs)


    def init_poolmanager(self, *args, **kwargs):
        ctx = PyOpenSSLContext(self._protocol)
        kwargs["ssl_context"] = ctx
        ctx._ctx.use_certificate(self._cert)
        ctx._ctx.use_privatekey(self._key)
        return super().init_poolmanager(*args, **kwargs)


def main():
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----")
    key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----", b"passphrase_goes_here")

    adapter = ClientSideCertificateHTTPAdapter(cert=cert, key=key, max_retries=Retry(total=10, backoff_factor=0.5))
    session = requests.Session()
    session.mount("https://www.hotmail.com/", adapter)
    session.get("https://www.hotmail.com/api/v2/mail")


if __name__ == "__main__":
    main()

Upvotes: 4

greenbender
greenbender

Reputation: 838

The example you have provided is passing a client-side cert as shown in the requests documentation.

As it stands there is no way to pass the client cert and key in memory (or as a string).

Monkey patching to the rescue - by monkey patching requests you can add the ability to load client certs and keys from memory. The following patch enables passing in a client cert and key in a variety formats without breaking the existing functionality.

import requests
from OpenSSL.crypto import PKCS12, X509, PKey


def _is_key_file_encrypted(keyfile):
    '''In memory key is not encrypted'''
    if isinstance(keyfile, PKey):
        return False
    return _is_key_file_encrypted.original(keyfile)


class PyOpenSSLContext(requests.packages.urllib3.contrib.pyopenssl.PyOpenSSLContext):
    '''Support loading certs from memory'''
    def load_cert_chain(self, certfile, keyfile=None, password=None):
        if isinstance(certfile, X509) and isinstance(keyfile, PKey):
            self._ctx.use_certificate(certfile)
            self._ctx.use_privatekey(keyfile)
        else:
            super().load_cert_chain(certfile, keyfile=keyfile, password=password)


class HTTPAdapter(requests.adapters.HTTPAdapter):
    '''Handle a variety of cert types'''
    def cert_verify(self, conn, url, verify, cert):
        if cert:
            # PKCS12
            if isinstance(cert, PKCS12):
                conn.cert_file = cert.get_certificate()
                conn.key_file = cert.get_privatekey()
                cert = None
            elif isinstance(cert, tuple) and len(cert) == 2:
                # X509 and PKey
                if isinstance(cert[0], X509) and hasattr(cert[1], PKey):
                    conn.cert_file = cert[0]
                    conn.key_file = cert[1]
                    cert = None
                # cryptography objects
                elif hasattr(cert[0], 'public_bytes') and hasattr(cert[1], 'private_bytes'):
                    conn.cert_file = X509.from_cryptography(cert[0])
                    conn.key_file = PKey.from_cryptography_key(cert[1])
                    cert = None
        super().cert_verify(conn, url, verify, cert)


def patch_requests(adapter=True):
    '''You can perform a full patch and use requests as usual:

    >>> patch_requests()
    >>> requests.get('https://httpbin.org/get')

    or use the adapter explicitly:

    >>> patch_requests(adapter=False)
    >>> session = requests.Session()
    >>> session.mount('https', HTTPAdapter())
    >>> session.get('https://httpbin.org/get')
    '''
    if hasattr(requests.packages.urllib3.util.ssl_, '_is_key_file_encrypted'):
        _is_key_file_encrypted.original = requests.packages.urllib3.util.ssl_._is_key_file_encrypted
        requests.packages.urllib3.util.ssl_._is_key_file_encrypted = _is_key_file_encrypted
    requests.packages.urllib3.util.ssl_.SSLContext = PyOpenSSLContext
    if adapter:
        requests.sessions.HTTPAdapter = HTTPAdapter

To use the patch you can do something like the following (assume the above code is in a file called patch.py)

import os
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from patch import patch_requests


CLIENT_CERT = serialization.load_pem_x509_certificate(
    os.getenv('CLIENT_CERT'), default_backend())
CLIENT_KEY = serialization.load_pem_private_key(
    os.getenv('CLIENT_KEY'), None, default_backend())


# monkey patch load_cert_chain to allow loading
# cryptography certs and keys from memory
patch_requests()


response = requests.get(url, cert=(CLIENT_CERT, CLIENT_KEY))

You now have the ability to supply a client cert to requests in memory in as pyopenssl object(s) or cryptography objects.

Upvotes: 9

Kaur J
Kaur J

Reputation: 96

If one wants to do this without using temporary file, it is possible by overriding the requests SSLContext. Sample can be seen in this answer.

Upvotes: 4

Dmitry Orlov
Dmitry Orlov

Reputation: 473

There is a way to do it via temp files, like this:

cert = tempfile.NamedTemporaryFile(delete=False)
cert.write(CERTIFICATE_AS_STRING)
cert.close()
requests.get(url, cert=cert.name, verify=True)
os.unlink(cert.name)

If you'd like to know why this is potentially unsecure, check out my answer here: https://stackoverflow.com/a/46570264/6445270

Upvotes: 7

Related Questions