Seraf
Seraf

Reputation: 950

HSM integration with Python requests module

So I'm writting an application that needs to authenticate to a server using a client certificate (Mutual Authentication). The client certificate key is stored in an HSM (Gemalto's). I did the OpenSSL integration with the Luna Client but the requests module requires a file:

from requests import Session

session: Session = Session()

session.cert = (
    "/ssl/client.pem",
    "/ssl/client.key"
)
session.verify = "/ssl/server.pem"

My issue is that I could not find a way to bind the private key when it's in the HSM. Here's what I tried so far with the pycryptoki library:

from pycryptoki.session_management import (
    c_initialize_ex,
    c_open_session_ex,
    login_ex,
    c_logout_ex,
    c_close_session_ex,
    c_finalize_ex,
)
from requests import Session

c_initialize_ex()
auth_session = c_open_session_ex(0)
login_ex(auth_session, 0, "some-pass")

session: Session = Session()

session.cert = (
    "/ssl/client.pem",
    "rsa-private-156405640312",
)
session.verify = "/ssl/server.pem"

...

c_logout_ex(auth_session)
c_close_session_ex(auth_session)
c_finalize_ex()

I have opened an issue on there here a while back, I had to finish the app implementation so I put the HSM integration on the ice, but I need to make that work before going to production: https://github.com/gemalto/pycryptoki/issues/17

I also tried using py-hsm but it is a low level api library, I also opened an issue there with an example of my code:

from pyhsm.hsmclient import HsmClient
from requests import Session

c = HsmClient(pkcs11_lib="/usr/lib/libCryptoki2_64.so")
c.open_session(slot=0)
c.login(pin="some-code")

session: Session = Session()
session.cert = "/ssl/client.pem"

c.logout()
c.close_session()

Anyone can provide an example of Mutual authentication with the certificate pair in an HSM? If you have something in C/C++ it would be great too, I could implement my request function and just wrap it in my python code.

Thank you in advance!

Upvotes: 2

Views: 3513

Answers (3)

wistfulastronaut
wistfulastronaut

Reputation: 1

If I'm understanding correctly and what you need is to present a certificate signed by the private key in the HSM - because presumably the self-signed cert from the HSM is trusted by the server - I think I solved this particular problem (though if I don't understand correctly I've solved a different problem...). The (messy) solution looks something like the following:

  1. Hand-build a TBS (to-be-signed) certificate using the pyasn1 library (this is the raw ASN.1 data that a CSR presents for signing). This involves creating a TBS header with (I believe) at least the CN and validity dates, then appending the public key for signing.
  2. Look up the handle for the HSM's private key by label
  3. Hash the TBS cert with the appropriate function
  4. Sign the TBS cert hash with the HSM's private key
  5. Append the signature to the TBS cert in the appropriate field to complete the full X.509 cert
  6. Optionally, export the HSM's self-signed cert if you need to present the full chain
from datetime import datetime, timedelta
import hashlib
from pyasn1.type.univ import BitString, ObjectIdentifier
from pyasn1.type import useful
from pyasn1.codec.der import encoder, decoder
from pyasn1_modules import rfc3280, rfc4055
from pycryptoki.defines import CKA_LABEL, CKA_CLASS, CKO_PRIVATE_KEY, CKM_RSA_PKCS_PSS, CKM_SHA256, CKG_MGF1_SHA256
from pycryptoki.mechanism import Mechanism
from pycryptoki.object_attr_lookup import c_find_objects_ex
from pycryptoki.sign_verify import c_sign_ex

pub_key = rfc4055.RSAPublicKey()
pub_key['modulus'] = int.from_bytes(public_key_modulus, byteorder="big")
pub_key['publicExponent'] = int.from_bytes(public_key_exponent, byteorder="big")
pub_key_der = encoder.encode(pub_key)

publicKeyInfo = rfc3280.SubjectPublicKeyInfo()
publicKeyInfo['algorithm'] = rfc4055.rSASSA_PSS_SHA256_Identifier
publicKeyInfo['subjectPublicKey'] = BitString.fromOctetString(pub_key_der)

issuer = rfc3280.Name().setComponents(
    rfc3280.RDNSequence().setComponents(
        rfc3280.RelativeDistinguishedName().setComponents(
            rfc3280.AttributeTypeAndValue().setComponents(
                ObjectIdentifier(rfc3280.id_at_commonName),
                rfc3280.CommonName(issuer_common_name)
            )
        )
    )
)

subject = rfc3280.Name().setComponents(
    rfc3280.RDNSequence().setComponents(
        rfc3280.RelativeDistinguishedName().setComponents(
            rfc3280.AttributeTypeAndValue().setComponents(
                ObjectIdentifier(rfc3280.id_at_commonName),
                rfc3280.CommonName(cert_common_name)
            )
        )
    )
)

now = datetime.utcnow()
validity = rfc3280.Validity()
validity['notBefore'] = rfc3280.Time().setComponentByName('utcTime', useful.UTCTime.fromDateTime(now))
validity['notAfter']  = rfc3280.Time().setComponentByName('utcTime', useful.UTCTime.fromDateTime(now + timedelta(days=validity_in_years * 365)))

body = rfc3280.TBSCertificate()
body['serialNumber'] = serial_number
body['signature'] = rfc4055.rSASSA_PSS_SHA256_Identifier
body['issuer'] = issuer
body['validity'] = validity
body['subject'] = subject
body['subjectPublicKeyInfo'] = publicKeyInfo

cert_info = rfc3280.Certificate()
cert_info['tbsCertificate'] = body
cert_info['signatureAlgorithm'] = rfc4055.rSASSA_PSS_SHA256_Identifier

tbs_cert_der = encoder.encode(body)

cert_hash = hashlib.sha256(tbs_cert_der).digest()

template = {
    CKA_CLASS: CKO_PRIVATE_KEY,
    CKA_LABEL: 'my_priv_key',
}
priv = c_find_objects_ex(self.session, template, 1).pop(0)

mech = Mechanism(
    mech_type=CKM_RSA_PKCS_PSS,
    params={
        "hashAlg": CKM_SHA256,
        "mgf": CKG_MGF1_SHA256,
        "usSaltLen": len(cert_hash),
    }
)

signature = c_sign_ex(session, priv, cert_hash, mech)
cert_info['signature'] = BitString.fromOctetString(signature)
cert_der = encoder.encode(cert_info)

If instead what you need (or maybe both) is to present a certificate to the server so it can encrypt some data for you to decrypt in the HSM, you can create a self-signed cert in the HSM using the cmu utility (I suppose you could hand-build that too and sign the HSM's own public key with its private key if you don't want to use command-line utils) and pass that to the sever.

Hope that helps! It cost me a lot of time and a hole in the wall from banging my head repeatedly...

Upvotes: 0

Sami ZRAFI
Sami ZRAFI

Reputation: 31

I have tested almost all wrappers for Python to do the same. PyKCS11 is not really solid.

I recommend to use one of these two possibilities:

1. PyCurl

When you configure correctly your OpenSSL and so your cURL, the Python wrapper to cUrl can do this.

Here is a simple implementation:

import pycurl
from io import BytesIO
import pprint
import json


c = pycurl.Curl()
url = 'https://yourserver/endpoint'
c.setopt(pycurl.URL, url)

# set proxy-insecure
c.setopt(c.SSL_VERIFYPEER, 0)
c.setopt(c.SSL_VERIFYHOST, False)
c.setopt(c.VERBOSE, 0)

c.setopt(pycurl.SSLENGINE, 'pkcs11')
c.setopt(pycurl.SSLCERTTYPE, 'eng')
c.setopt(pycurl.SSLKEYTYPE, 'eng')

c.setopt(pycurl.SSLCERT, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
                        'token=AAAAA;id=BBBBBBBBB;'
                        'object=CCCCCC;type=cert;pin-value=pin-pin')
c.setopt(pycurl.SSLKEY, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
                        'token=AAAAA;id=BBBBBBBBB;'
                        'object=CCCCCC;type=private;pin-value=pin-pin')


# set headers
c.setopt(pycurl.HEADER, True)
# c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0')
c.setopt(pycurl.HTTPHEADER, ("HEADER_TO_ADD:VALUE",))

buffer = BytesIO()

c.setopt(c.WRITEDATA, buffer)
c.perform()
# HTTP response code, e.g. 200.
print('>>> Status: %d' % c.getinfo(c.RESPONSE_CODE))
# Elapsed time for the transfer.
print('>>> Time: %f' % c.getinfo(c.TOTAL_TIME))

# getinfo must be called before close.
c.close()

body = buffer.getvalue().decode('utf-8')
print('>>> Body:\n', body)

if body.find('{') >= 0:
    body = body[body.find('{'):]

dictionary = json.loads(body)
pprint.pprint(dictionary)

Note that the pkcs#11 URI Scheme must be compliant to the RFC7512 It can be discovered using this command:

p11tool --provider=/usr/lib/libeTPkcs11.so --list-all

All fields including the pin must be url encoded. Use an online website to encode/decode string(pin)<->url_encoded

2. M2Crypto

This is the best pkcs#11 implementation done for Python in my point of view. It allows to override urllib2 and so requests calls thanks to the requests.Session.mount method and the requests.adapters.BaseAdapter.

I put here some code to use it with urllib2:

from M2Crypto import m2urllib2 as urllib2
from M2Crypto import m2, SSL, Engine


# load dynamic engine
e = Engine.load_dynamic_engine("pkcs11", "/usr/lib/x86_64-linux-gnu/engines-1.1/libpkcs11.so")
pk = Engine.Engine("pkcs11")
pk.ctrl_cmd_string("MODULE_PATH", "/usr/lib/libeTPkcs11.so")

m2.engine_init(m2.engine_by_id("pkcs11"))
pk.ctrl_cmd_string("PIN", 'pin-pin')
cert = pk.load_certificate('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
                        'token=AAAAA;id=BBBBBBBBB;'
                        'object=CCCCCC')
key = pk.load_private_key('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
                        'token=AAAAA;id=BBBBBBBBB;'
                        'object=CCCCCC', pin='pin-pin')


ssl_context = SSL.Context('tls')
ssl_context.set_cipher_list('EECDH+AESGCM:EECDH+aECDSA:EECDH+aRSA:EDH+AESGCM:EDH+aECDSA:EDH+aRSA:!SHA1:!SHA256:!SHA384:!MEDIUM:!LOW:!EXP:!aNULL:!eNULL:!PSK:!SRP:@STRENGTH')
ssl_context.set_default_verify_paths()
ssl_context.set_allow_unknown_ca(True)

SSL.Connection.postConnectionCheck = None

m2.ssl_ctx_use_x509(ssl_context.ctx, cert.x509)
m2.ssl_ctx_use_pkey_privkey(ssl_context.ctx, key.pkey)

opener = urllib2.build_opener(ssl_context)
urllib2.install_opener(opener)

url = 'https://yourserver/endpoint'

content = urllib2.urlopen(url=url).read()
# content = opener.open(url)
print(content)

Note that, we don't indicate type and pin in the pkcs#11 URI as for PyCurl. The PIN is indicated as string and not url encoded when passed to load_private_key.

Finally, I found a good implementation for the HttpAdapter to mount on requests to do the calls. Here is the original implementation.

I added some lines to support the PIN code and to disable checking hostname with this:

M2Crypto.SSL.Connection.postConnectionCheck = None

And here is how to mount the adapter:

from requests import Session
from m2requests import M2HttpsAdapter
import pprint, json


request = Session()
m2httpsadapter = M2HttpsAdapter()
# Added by me to set a pin when loading private key
m2httpsadapter.pin = 'pin-pin'
# Need this attribute set to False else we cannot use direct IP (added by me to disable checking hostname)
m2httpsadapter.check_hostname = False
request.mount("https://", m2httpsadapter)

request.cert=('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
                        'token=AAAAA;id=BBBBBBBBB;'
                        'object=CCCCCC',
              'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
                        'token=AAAAA;id=BBBBBBBBB;'
                        'object=CCCCCC')

headers = {'HEADER_TO_ADD_IF_WANTED': 'VALUE', }
r = request.get("https://yourserver/endpoint", headers=headers, verify=False)

print(r.status_code)
pprint.pprint(json.loads(r.raw.data.decode('utf-8')))

Upvotes: 2

255
255

Reputation: 101

A private key in an HSM can be used to sign some data, which is what you are trying to accomplish for mutual authentication. Signing some data provided by the other party proves that you control the Private Key corresponding to the certificate.

From the HSM you can get a handle to the private key and use that handle to perform private key operations without exporting the key (the whole reason for using an HSM is to prevent the content of the private key from being seen). HSMs can expose a lot of different interfaces but PKCS#11 is by far the most common. You don't really need to use OpenSSL or to program in C/C++. Since you are using Python requests, there are some PKCS#11 Python libraries that can be used.

Please take a look at this Python PKCS11 library: https://github.com/danni/python-pkcs11.

You can then do things like (assuming the key is RSA)

import pkcs11

lib = pkcs11.lib("/usr/lib/libCryptoki2_64.so")
token = lib.get_token(token_label='DEMO')

# Open a session on our token
with token.open(user_pin='1234') as session:
    # Get the key from the HSM by label
    priv = session.get_key(
            object_class=pkcs11.constants.ObjectClass.PRIVATE_KEY,
            label='key_label')
    # sign with that key using the required mechanism:
    signature = priv.sign(my_to_be_signed_data, mechanism=pkcs11.Mechanism.SHA256_RSA_PKCS)

You do not provide many details about this mutual authentication but hopefully, this should get you on the right track. The code above retrieves the key based on a label, but this PKCS#11 library also supports finding a key in the HSM based on a lot of other properties.

Upvotes: 1

Related Questions