Reputation: 19
I am trying to set up a simple client and server using python-mbedtls, where only the server shall be certificated via one intermediate certificate. I was able to make it work without an intermediate certificate but I do not get why it is not working with it.
I have the following server.py and client.py but unfortunately I always get this error:
mbedtls.exceptions.TLSError: TLSError([0x2700] 'X509 - Certificate verification failed, e.g. CRL, CA or signature check failed')
Does anyone have an idea what could be the problem?
client.py:
hostname = "www.example.com"
port = 60187
# this is where the Client class provided by python-mbedtls is inserted
def main() -> None:
root_crt = x509.CRT.from_file("root.crt")
inter_crt = x509.CRT.from_file("inter.crt")
trust_store = TrustStore()
trust_store.add(root_crt)
trust_store.add(inter_crt)
conf = TLSConfiguration(
trust_store=trust_store,
validate_certificates=True,
)
message = "Hello world!"
with Client(
conf, socket.SOCK_STREAM, ("127.0.0.1", port), hostname
) as cli:
_enable_debug_output(cli.context)
_set_debug_level(2)
cli.do_handshake()
received = cli.echo(message.encode("utf-8"), 1024)
print(received.decode("utf-8"))
if __name__ == "__main__":
main()
server.py:
hostname = "www.example.com"
port = 60187
# this is where the Server class of the server.py provided by python-mbedtls is inserted
# from the unit tests of python-mbedtls:
def make_root_ca(
# pylint: disable=too-many-arguments
subject: Optional[str] = None,
not_before: Optional[dt.datetime] = None,
not_after: Optional[dt.datetime] = None,
serial_number: Optional[int] = None,
basic_constraints: Optional[BasicConstraints] = None,
digestmod: Optional[hashlib.Algorithm] = None,
) -> Tuple[CRT, _Key]:
if subject is None:
subject = "OU=test, CN=Trusted CA"
if not_before is None:
not_before = dt.datetime.utcnow()
if not_after is None:
not_after = not_before + dt.timedelta(days=90)
if serial_number is None:
serial_number = 0x123456
if basic_constraints is None:
basic_constraints = BasicConstraints(True, -1)
if digestmod is None:
digestmod = hashlib.sha256
key = RSA()
key.generate()
crt = CRT.selfsign(
csr=CSR.new(key, subject, digestmod()),
issuer_key=key,
not_before=not_before,
not_after=not_after,
serial_number=serial_number,
basic_constraints=basic_constraints,
)
return crt, key
# from the unit tests of python-mbedtls:
def make_crt(
# pylint: disable=too-many-arguments
issuer_crt: CRT,
issuer_key: str,
subject: Optional[str] = None,
not_before: Optional[dt.datetime] = None,
not_after: Optional[dt.datetime] = None,
serial_number: Optional[int] = None,
basic_constraints: Optional[BasicConstraints] = None,
digestmod: Optional[hashlib.Algorithm] = None,
) -> Tuple[CRT, str]:
if subject is None:
subject = "OU=test, CN=hostname"
if not_before is None:
not_before = issuer_crt.not_before
if not_after is None:
not_after = issuer_crt.not_after
if serial_number is None:
serial_number = 0x123456
if basic_constraints is None:
basic_constraints = BasicConstraints()
if digestmod is None:
# TODO: issuer_crt.digestmod should work but doesn't.
digestmod = hashlib.sha256
key = RSA()
key.generate()
crt = issuer_crt.sign(
csr=CSR.new(key, subject, digestmod()),
issuer_key=issuer_key,
not_before=not_before,
not_after=not_after,
serial_number=serial_number,
basic_constraints=basic_constraints,
)
return crt, key
# custom function to create my certificates:
def create_and_export_certificates():
root_crt, root_key = make_root_ca()
inter_crt, inter_key = make_crt(
root_crt, root_key, subject="OU=test, CN=Intermediate CA"
)
ee_crt, ee_key = make_crt(
inter_crt, inter_key, subject=f"OU=test, CN={hostname}"
)
with open("root.crt", "wt") as file:
file.write(root_crt.to_PEM())
with open("inter.crt", "wt") as file:
file.write(inter_crt.to_PEM())
print(root_crt.verify(inter_crt))
print(inter_crt.verify(ee_crt))
return (ee_crt, inter_crt, root_crt), ee_key
def main() -> NoReturn:
(ee_crt, inter_crt, root_crt), ee_key = create_and_export_certificates()
conf = TLSConfiguration(
certificate_chain=([ee_crt, inter_crt, root_crt], ee_key),
validate_certificates=False
)
with Server(conf, socket.SOCK_STREAM, ("127.0.0.1", 60_111)) as srv:
_enable_debug_output(srv.context)
_set_debug_level(2)
srv.run(partial(echo_handler, packet_size=4069))
if __name__ == "__main__":
import faulthandler
faulthandler.enable()
with suppress(KeyboardInterrupt):
main()
What I tried so far:
What I expected:
Upvotes: 0
Views: 87