Reputation: 950
So I'm writting an application that needs to authenticate to a server using a client certificate (Mutual Authentication). The client certificate key is stored in an HSM (Gemalto's). I did the OpenSSL integration with the Luna Client but the requests module requires a file:
from requests import Session
session: Session = Session()
session.cert = (
"/ssl/client.pem",
"/ssl/client.key"
)
session.verify = "/ssl/server.pem"
My issue is that I could not find a way to bind the private key when it's in the HSM. Here's what I tried so far with the pycryptoki library:
from pycryptoki.session_management import (
c_initialize_ex,
c_open_session_ex,
login_ex,
c_logout_ex,
c_close_session_ex,
c_finalize_ex,
)
from requests import Session
c_initialize_ex()
auth_session = c_open_session_ex(0)
login_ex(auth_session, 0, "some-pass")
session: Session = Session()
session.cert = (
"/ssl/client.pem",
"rsa-private-156405640312",
)
session.verify = "/ssl/server.pem"
...
c_logout_ex(auth_session)
c_close_session_ex(auth_session)
c_finalize_ex()
I have opened an issue on there here a while back, I had to finish the app implementation so I put the HSM integration on the ice, but I need to make that work before going to production: https://github.com/gemalto/pycryptoki/issues/17
I also tried using py-hsm but it is a low level api library, I also opened an issue there with an example of my code:
from pyhsm.hsmclient import HsmClient
from requests import Session
c = HsmClient(pkcs11_lib="/usr/lib/libCryptoki2_64.so")
c.open_session(slot=0)
c.login(pin="some-code")
session: Session = Session()
session.cert = "/ssl/client.pem"
c.logout()
c.close_session()
Anyone can provide an example of Mutual authentication with the certificate pair in an HSM? If you have something in C/C++ it would be great too, I could implement my request function and just wrap it in my python code.
Thank you in advance!
Upvotes: 2
Views: 3513
Reputation: 1
If I'm understanding correctly and what you need is to present a certificate signed by the private key in the HSM - because presumably the self-signed cert from the HSM is trusted by the server - I think I solved this particular problem (though if I don't understand correctly I've solved a different problem...). The (messy) solution looks something like the following:
from datetime import datetime, timedelta
import hashlib
from pyasn1.type.univ import BitString, ObjectIdentifier
from pyasn1.type import useful
from pyasn1.codec.der import encoder, decoder
from pyasn1_modules import rfc3280, rfc4055
from pycryptoki.defines import CKA_LABEL, CKA_CLASS, CKO_PRIVATE_KEY, CKM_RSA_PKCS_PSS, CKM_SHA256, CKG_MGF1_SHA256
from pycryptoki.mechanism import Mechanism
from pycryptoki.object_attr_lookup import c_find_objects_ex
from pycryptoki.sign_verify import c_sign_ex
pub_key = rfc4055.RSAPublicKey()
pub_key['modulus'] = int.from_bytes(public_key_modulus, byteorder="big")
pub_key['publicExponent'] = int.from_bytes(public_key_exponent, byteorder="big")
pub_key_der = encoder.encode(pub_key)
publicKeyInfo = rfc3280.SubjectPublicKeyInfo()
publicKeyInfo['algorithm'] = rfc4055.rSASSA_PSS_SHA256_Identifier
publicKeyInfo['subjectPublicKey'] = BitString.fromOctetString(pub_key_der)
issuer = rfc3280.Name().setComponents(
rfc3280.RDNSequence().setComponents(
rfc3280.RelativeDistinguishedName().setComponents(
rfc3280.AttributeTypeAndValue().setComponents(
ObjectIdentifier(rfc3280.id_at_commonName),
rfc3280.CommonName(issuer_common_name)
)
)
)
)
subject = rfc3280.Name().setComponents(
rfc3280.RDNSequence().setComponents(
rfc3280.RelativeDistinguishedName().setComponents(
rfc3280.AttributeTypeAndValue().setComponents(
ObjectIdentifier(rfc3280.id_at_commonName),
rfc3280.CommonName(cert_common_name)
)
)
)
)
now = datetime.utcnow()
validity = rfc3280.Validity()
validity['notBefore'] = rfc3280.Time().setComponentByName('utcTime', useful.UTCTime.fromDateTime(now))
validity['notAfter'] = rfc3280.Time().setComponentByName('utcTime', useful.UTCTime.fromDateTime(now + timedelta(days=validity_in_years * 365)))
body = rfc3280.TBSCertificate()
body['serialNumber'] = serial_number
body['signature'] = rfc4055.rSASSA_PSS_SHA256_Identifier
body['issuer'] = issuer
body['validity'] = validity
body['subject'] = subject
body['subjectPublicKeyInfo'] = publicKeyInfo
cert_info = rfc3280.Certificate()
cert_info['tbsCertificate'] = body
cert_info['signatureAlgorithm'] = rfc4055.rSASSA_PSS_SHA256_Identifier
tbs_cert_der = encoder.encode(body)
cert_hash = hashlib.sha256(tbs_cert_der).digest()
template = {
CKA_CLASS: CKO_PRIVATE_KEY,
CKA_LABEL: 'my_priv_key',
}
priv = c_find_objects_ex(self.session, template, 1).pop(0)
mech = Mechanism(
mech_type=CKM_RSA_PKCS_PSS,
params={
"hashAlg": CKM_SHA256,
"mgf": CKG_MGF1_SHA256,
"usSaltLen": len(cert_hash),
}
)
signature = c_sign_ex(session, priv, cert_hash, mech)
cert_info['signature'] = BitString.fromOctetString(signature)
cert_der = encoder.encode(cert_info)
If instead what you need (or maybe both) is to present a certificate to the server so it can encrypt some data for you to decrypt in the HSM, you can create a self-signed cert in the HSM using the cmu
utility (I suppose you could hand-build that too and sign the HSM's own public key with its private key if you don't want to use command-line utils) and pass that to the sever.
Hope that helps! It cost me a lot of time and a hole in the wall from banging my head repeatedly...
Upvotes: 0
Reputation: 31
I have tested almost all wrappers for Python to do the same. PyKCS11 is not really solid.
I recommend to use one of these two possibilities:
1. PyCurl
When you configure correctly your OpenSSL and so your cURL, the Python wrapper to cUrl can do this.
Here is a simple implementation:
import pycurl
from io import BytesIO
import pprint
import json
c = pycurl.Curl()
url = 'https://yourserver/endpoint'
c.setopt(pycurl.URL, url)
# set proxy-insecure
c.setopt(c.SSL_VERIFYPEER, 0)
c.setopt(c.SSL_VERIFYHOST, False)
c.setopt(c.VERBOSE, 0)
c.setopt(pycurl.SSLENGINE, 'pkcs11')
c.setopt(pycurl.SSLCERTTYPE, 'eng')
c.setopt(pycurl.SSLKEYTYPE, 'eng')
c.setopt(pycurl.SSLCERT, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC;type=cert;pin-value=pin-pin')
c.setopt(pycurl.SSLKEY, 'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC;type=private;pin-value=pin-pin')
# set headers
c.setopt(pycurl.HEADER, True)
# c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:8.0) Gecko/20100101 Firefox/8.0')
c.setopt(pycurl.HTTPHEADER, ("HEADER_TO_ADD:VALUE",))
buffer = BytesIO()
c.setopt(c.WRITEDATA, buffer)
c.perform()
# HTTP response code, e.g. 200.
print('>>> Status: %d' % c.getinfo(c.RESPONSE_CODE))
# Elapsed time for the transfer.
print('>>> Time: %f' % c.getinfo(c.TOTAL_TIME))
# getinfo must be called before close.
c.close()
body = buffer.getvalue().decode('utf-8')
print('>>> Body:\n', body)
if body.find('{') >= 0:
body = body[body.find('{'):]
dictionary = json.loads(body)
pprint.pprint(dictionary)
Note that the pkcs#11 URI Scheme must be compliant to the RFC7512 It can be discovered using this command:
p11tool --provider=/usr/lib/libeTPkcs11.so --list-all
All fields including the pin must be url encoded. Use an online website to encode/decode string(pin)<->url_encoded
2. M2Crypto
This is the best pkcs#11 implementation done for Python in my point of view. It allows to override urllib2 and so requests calls thanks to the requests.Session.mount method and the requests.adapters.BaseAdapter.
I put here some code to use it with urllib2:
from M2Crypto import m2urllib2 as urllib2
from M2Crypto import m2, SSL, Engine
# load dynamic engine
e = Engine.load_dynamic_engine("pkcs11", "/usr/lib/x86_64-linux-gnu/engines-1.1/libpkcs11.so")
pk = Engine.Engine("pkcs11")
pk.ctrl_cmd_string("MODULE_PATH", "/usr/lib/libeTPkcs11.so")
m2.engine_init(m2.engine_by_id("pkcs11"))
pk.ctrl_cmd_string("PIN", 'pin-pin')
cert = pk.load_certificate('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC')
key = pk.load_private_key('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC', pin='pin-pin')
ssl_context = SSL.Context('tls')
ssl_context.set_cipher_list('EECDH+AESGCM:EECDH+aECDSA:EECDH+aRSA:EDH+AESGCM:EDH+aECDSA:EDH+aRSA:!SHA1:!SHA256:!SHA384:!MEDIUM:!LOW:!EXP:!aNULL:!eNULL:!PSK:!SRP:@STRENGTH')
ssl_context.set_default_verify_paths()
ssl_context.set_allow_unknown_ca(True)
SSL.Connection.postConnectionCheck = None
m2.ssl_ctx_use_x509(ssl_context.ctx, cert.x509)
m2.ssl_ctx_use_pkey_privkey(ssl_context.ctx, key.pkey)
opener = urllib2.build_opener(ssl_context)
urllib2.install_opener(opener)
url = 'https://yourserver/endpoint'
content = urllib2.urlopen(url=url).read()
# content = opener.open(url)
print(content)
Note that, we don't indicate type and pin in the pkcs#11 URI as for PyCurl. The PIN is indicated as string and not url encoded when passed to load_private_key.
Finally, I found a good implementation for the HttpAdapter to mount on requests to do the calls. Here is the original implementation.
I added some lines to support the PIN code and to disable checking hostname with this:
M2Crypto.SSL.Connection.postConnectionCheck = None
And here is how to mount the adapter:
from requests import Session
from m2requests import M2HttpsAdapter
import pprint, json
request = Session()
m2httpsadapter = M2HttpsAdapter()
# Added by me to set a pin when loading private key
m2httpsadapter.pin = 'pin-pin'
# Need this attribute set to False else we cannot use direct IP (added by me to disable checking hostname)
m2httpsadapter.check_hostname = False
request.mount("https://", m2httpsadapter)
request.cert=('pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC',
'pkcs11:model=XXX;manufacturer=YYYYY;serial=ZZZZ;'
'token=AAAAA;id=BBBBBBBBB;'
'object=CCCCCC')
headers = {'HEADER_TO_ADD_IF_WANTED': 'VALUE', }
r = request.get("https://yourserver/endpoint", headers=headers, verify=False)
print(r.status_code)
pprint.pprint(json.loads(r.raw.data.decode('utf-8')))
Upvotes: 2
Reputation: 101
A private key in an HSM can be used to sign some data, which is what you are trying to accomplish for mutual authentication. Signing some data provided by the other party proves that you control the Private Key corresponding to the certificate.
From the HSM you can get a handle to the private key and use that handle to perform private key operations without exporting the key (the whole reason for using an HSM is to prevent the content of the private key from being seen). HSMs can expose a lot of different interfaces but PKCS#11 is by far the most common. You don't really need to use OpenSSL or to program in C/C++. Since you are using Python requests, there are some PKCS#11 Python libraries that can be used.
Please take a look at this Python PKCS11 library: https://github.com/danni/python-pkcs11.
You can then do things like (assuming the key is RSA)
import pkcs11
lib = pkcs11.lib("/usr/lib/libCryptoki2_64.so")
token = lib.get_token(token_label='DEMO')
# Open a session on our token
with token.open(user_pin='1234') as session:
# Get the key from the HSM by label
priv = session.get_key(
object_class=pkcs11.constants.ObjectClass.PRIVATE_KEY,
label='key_label')
# sign with that key using the required mechanism:
signature = priv.sign(my_to_be_signed_data, mechanism=pkcs11.Mechanism.SHA256_RSA_PKCS)
You do not provide many details about this mutual authentication but hopefully, this should get you on the right track. The code above retrieves the key based on a label, but this PKCS#11 library also supports finding a key in the HSM based on a lot of other properties.
Upvotes: 1