diff --git a/test/modules/md/md_cert_util.py b/test/modules/md/md_cert_util.py index abcd36b..6cd034a 100644 --- a/test/modules/md/md_cert_util.py +++ b/test/modules/md/md_cert_util.py @@ -1,6 +1,5 @@ import logging import re -import os import socket import OpenSSL import time @@ -12,6 +11,7 @@ from http.client import HTTPConnection from urllib.parse import urlparse +from cryptography import x509 SEC_PER_DAY = 24 * 60 * 60 @@ -23,45 +23,6 @@ class MDCertUtil(object): # Utility class for inspecting certificates in test cases # Uses PyOpenSSL: https://pyopenssl.org/en/stable/index.html - @classmethod - def create_self_signed_cert(cls, path, name_list, valid_days, serial=1000): - domain = name_list[0] - if not os.path.exists(path): - os.makedirs(path) - - cert_file = os.path.join(path, 'pubcert.pem') - pkey_file = os.path.join(path, 'privkey.pem') - # create a key pair - if os.path.exists(pkey_file): - key_buffer = open(pkey_file, 'rt').read() - k = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_buffer) - else: - k = OpenSSL.crypto.PKey() - k.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) - - # create a self-signed cert - cert = OpenSSL.crypto.X509() - cert.get_subject().C = "DE" - cert.get_subject().ST = "NRW" - cert.get_subject().L = "Muenster" - cert.get_subject().O = "greenbytes GmbH" - cert.get_subject().CN = domain - cert.set_serial_number(serial) - cert.gmtime_adj_notBefore(valid_days["notBefore"] * SEC_PER_DAY) - cert.gmtime_adj_notAfter(valid_days["notAfter"] * SEC_PER_DAY) - cert.set_issuer(cert.get_subject()) - - cert.add_extensions([OpenSSL.crypto.X509Extension( - b"subjectAltName", False, b", ".join(map(lambda n: b"DNS:" + n.encode(), name_list)) - )]) - cert.set_pubkey(k) - cert.sign(k, 'sha1') - - open(cert_file, "wt").write( - OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert).decode('utf-8')) - open(pkey_file, "wt").write( - OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, k).decode('utf-8')) - @classmethod def load_server_cert(cls, host_ip, host_port, host_name, tls=None, ciphers=None): ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD) @@ -138,17 +99,26 @@ def get_serial(self): # add leading 0s to align with word boundaries. return ("%lx" % (self.cert.get_serial_number())).upper() - def same_serial_as(self, other): - if isinstance(other, MDCertUtil): - return self.cert.get_serial_number() == other.cert.get_serial_number() - elif isinstance(other, OpenSSL.crypto.X509): - return self.cert.get_serial_number() == other.get_serial_number() - elif isinstance(other, str): + @staticmethod + def _get_serial(cert) -> int: + if isinstance(cert, x509.Certificate): + return cert.serial_number + if isinstance(cert, MDCertUtil): + return cert.get_serial_number() + elif isinstance(cert, OpenSSL.crypto.X509): + return cert.get_serial_number() + elif isinstance(cert, str): # assume a hex number - return self.cert.get_serial_number() == int(other, 16) - elif isinstance(other, int): - return self.cert.get_serial_number() == other - return False + return int(cert, 16) + elif isinstance(cert, int): + return cert + return 0 + + def get_serial_number(self): + return self._get_serial(self.cert) + + def same_serial_as(self, other): + return self._get_serial(self.cert) == self._get_serial(other) def get_not_before(self): tsp = self.cert.get_notBefore() diff --git a/test/modules/md/md_env.py b/test/modules/md/md_env.py index 1936519..a958781 100644 --- a/test/modules/md/md_env.py +++ b/test/modules/md/md_env.py @@ -12,9 +12,9 @@ import time from datetime import datetime, timedelta -from typing import Dict, Optional +from typing import Dict, Optional, Any -from pyhttpd.certs import CertificateSpec +from pyhttpd.certs import CertificateSpec, Credentials, HttpdTestCA from .md_cert_util import MDCertUtil from pyhttpd.env import HttpdTestSetup, HttpdTestEnv from pyhttpd.result import ExecResult @@ -611,8 +611,13 @@ def await_ocsp_status(self, domain, timeout=10, ca_file=None): time.sleep(0.1) raise TimeoutError(f"ocsp respopnse not available: {domain}") - def create_self_signed_cert(self, name_list, valid_days, serial=1000, path=None): - dirpath = path - if not path: - dirpath = os.path.join(self.store_domains(), name_list[0]) - return MDCertUtil.create_self_signed_cert(dirpath, name_list, valid_days, serial) + def create_self_signed_cert(self, spec: CertificateSpec, + valid_from: timedelta = timedelta(days=-1), + valid_to: timedelta = timedelta(days=89), + serial: Optional[int] = None) -> Credentials: + key_type = spec.key_type if spec.key_type else 'rsa4096' + return HttpdTestCA.create_credentials(spec=spec, issuer=None, + key_type=key_type, + valid_from=valid_from, + valid_to=valid_to, + serial=serial) diff --git a/test/modules/md/test_502_acmev2_drive.py b/test/modules/md/test_502_acmev2_drive.py index eb754f2..b064647 100644 --- a/test/modules/md/test_502_acmev2_drive.py +++ b/test/modules/md/test_502_acmev2_drive.py @@ -4,11 +4,12 @@ import json import os.path import re -import time +from datetime import timedelta import pytest +from pyhttpd.certs import CertificateSpec -from .md_conf import MDConf, MDConf +from .md_conf import MDConf from .md_cert_util import MDCertUtil from .md_env import MDTestEnv @@ -430,9 +431,12 @@ def test_md_502_201(self, env, renew_window, test_data_list): print("TRACE: start testing renew window: %s" % renew_window) for tc in test_data_list: print("TRACE: create self-signed cert: %s" % tc["valid"]) - env.create_self_signed_cert([name], tc["valid"]) - cert2 = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) - assert not cert2.same_serial_as(cert1) + creds = env.create_self_signed_cert(CertificateSpec(domains=[name]), + valid_from=timedelta(days=tc["valid"]["notBefore"]), + valid_to=timedelta(days=tc["valid"]["notAfter"])) + assert creds.certificate.serial_number != cert1.get_serial_number() + # copy it over, assess status again + creds.save_cert_pem(env.store_domain_file(name, 'pubcert.pem')) md = env.a2md(["list", name]).json['output'][0] assert md["renew"] == tc["renew"], \ "Expected renew == {} indicator in {}, test case {}".format(tc["renew"], md, tc) diff --git a/test/modules/md/test_702_auto.py b/test/modules/md/test_702_auto.py index da1ca1f..90103e3 100644 --- a/test/modules/md/test_702_auto.py +++ b/test/modules/md/test_702_auto.py @@ -1,9 +1,9 @@ import os -import time +from datetime import timedelta import pytest +from pyhttpd.certs import CertificateSpec -from pyhttpd.conf import HttpdConf from pyhttpd.env import HttpdTestEnv from .md_cert_util import MDCertUtil from .md_env import MDTestEnv @@ -320,18 +320,22 @@ def test_md_702_009(self, env): assert cert1.same_serial_as(stat['rsa']['serial']) # # create self-signed cert, with critical remaining valid duration -> drive again - env.create_self_signed_cert([domain], {"notBefore": -120, "notAfter": 2}, serial=7029) - cert3 = MDCertUtil(env.store_domain_file(domain, 'pubcert.pem')) - assert cert3.same_serial_as('1B75') + creds = env.create_self_signed_cert(CertificateSpec(domains=[domain]), + valid_from=timedelta(days=-120), + valid_to=timedelta(days=2), + serial=7029) + creds.save_cert_pem(env.store_domain_file(domain, 'pubcert.pem')) + creds.save_pkey_pem(env.store_domain_file(domain, 'privkey.pem')) + assert creds.certificate.serial_number == 7029 assert env.apache_restart() == 0 stat = env.get_certificate_status(domain) - assert cert3.same_serial_as(stat['rsa']['serial']) + assert creds.certificate.serial_number == int(stat['rsa']['serial'], 16) # # cert should renew and be different afterwards assert env.await_completion([domain], must_renew=True) stat = env.get_certificate_status(domain) - assert not cert3.same_serial_as(stat['rsa']['serial']) - + creds.certificate.serial_number != int(stat['rsa']['serial'], 16) + # test case: drive with an unsupported challenge due to port availability def test_md_702_010(self, env): domain = self.test_domain diff --git a/test/modules/md/test_730_static.py b/test/modules/md/test_730_static.py index 891ae62..209d33a 100644 --- a/test/modules/md/test_730_static.py +++ b/test/modules/md/test_730_static.py @@ -1,6 +1,8 @@ import os +from datetime import timedelta import pytest +from pyhttpd.certs import CertificateSpec from .md_conf import MDConf from .md_env import MDTestEnv @@ -30,12 +32,14 @@ def test_md_730_001(self, env): domains = [domain, 'www.%s' % domain] testpath = os.path.join(env.gen_dir, 'test_920_001') # cert that is only 10 more days valid - env.create_self_signed_cert(domains, {"notBefore": -80, "notAfter": 10}, - serial=730001, path=testpath) + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-80), + valid_to=timedelta(days=10), + serial=730001) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env) conf.start_md(domains) conf.add(f"MDCertificateFile {cert_file}") @@ -60,12 +64,14 @@ def test_md_730_002(self, env): domains = [domain, 'www.%s' % domain] testpath = os.path.join(env.gen_dir, 'test_920_001') # cert that is only 10 more days valid - env.create_self_signed_cert(domains, {"notBefore": -80, "notAfter": 10}, - serial=730001, path=testpath) + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-80), + valid_to=timedelta(days=10), + serial=730001) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env) conf.start_md(domains) conf.add(f"MDPrivateKeys secp384r1 rsa3072") @@ -93,13 +99,14 @@ def test_md_730_003(self, env): domains = [domain, 'www.%s' % domain] testpath = os.path.join(env.gen_dir, 'test_920_001') # cert that is only 10 more days valid - env.create_self_signed_cert(domains, {"notBefore": -80, "notAfter": 10}, - serial=730001, path=testpath) + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-80), + valid_to=timedelta(days=10), + serial=730001) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) - + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env) conf.start_md(domains) conf.add(f"MDCertificateFile {cert_file}") diff --git a/test/modules/md/test_801_stapling.py b/test/modules/md/test_801_stapling.py index 5c03602..1eacfab 100644 --- a/test/modules/md/test_801_stapling.py +++ b/test/modules/md/test_801_stapling.py @@ -2,7 +2,9 @@ import os import time +from datetime import timedelta import pytest +from pyhttpd.certs import CertificateSpec from .md_conf import MDConf from .md_env import MDTestEnv @@ -334,12 +336,14 @@ def test_md_801_009(self, env): domains = [md] testpath = os.path.join(env.gen_dir, 'test_801_009') # cert that is 30 more days valid - env.create_self_signed_cert(domains, {"notBefore": -60, "notAfter": 30}, - serial=801009, path=testpath) + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-60), + valid_to=timedelta(days=30), + serial=801009) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env) conf.start_md(domains) conf.add("MDCertificateFile %s" % cert_file) diff --git a/test/modules/md/test_901_message.py b/test/modules/md/test_901_message.py index b18cfd3..d5d66e6 100644 --- a/test/modules/md/test_901_message.py +++ b/test/modules/md/test_901_message.py @@ -3,9 +3,11 @@ import json import os import time +from datetime import timedelta import pytest +from pyhttpd.certs import CertificateSpec -from .md_conf import MDConf, MDConf +from .md_conf import MDConf from .md_env import MDTestEnv @@ -155,13 +157,15 @@ def test_md_901_010(self, env): domain = self.test_domain domains = [domain, 'www.%s' % domain] testpath = os.path.join(env.gen_dir, 'test_901_010') - # cert that is only 10 more days valid - env.create_self_signed_cert(domains, {"notBefore": -70, "notAfter": 20}, - serial=901010, path=testpath) + # cert that is only 20 more days valid + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-70), + valid_to=timedelta(days=20), + serial=901010) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env) conf.add(f"MDMessageCmd {self.mcmd} {self.mlog}") conf.start_md(domains) @@ -178,13 +182,15 @@ def test_md_901_011(self, env): domain = self.test_domain domains = [domain, f'www.{domain}'] testpath = os.path.join(env.gen_dir, 'test_901_011') - # cert that is only 10 more days valid - env.create_self_signed_cert(domains, {"notBefore": -85, "notAfter": 5}, - serial=901011, path=testpath) + # cert that is only 5 more days valid + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-85), + valid_to=timedelta(days=5), + serial=901010) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env) conf.add(f"MDMessageCmd {self.mcmd} {self.mlog}") conf.start_md(domains) diff --git a/test/modules/md/test_920_status.py b/test/modules/md/test_920_status.py index 6ad7087..6a5b338 100644 --- a/test/modules/md/test_920_status.py +++ b/test/modules/md/test_920_status.py @@ -2,9 +2,10 @@ import os import re -import time +from datetime import timedelta import pytest +from pyhttpd.certs import CertificateSpec from .md_conf import MDConf from shutil import copyfile @@ -165,13 +166,15 @@ def test_md_920_011(self, env): domain = self.test_domain domains = [domain, 'www.%s' % domain] testpath = os.path.join(env.gen_dir, 'test_920_011') - # cert that is only 10 more days valid - env.create_self_signed_cert(domains, {"notBefore": -70, "notAfter": 20}, - serial=920011, path=testpath) + # cert that is only 20 more days valid + creds = env.create_self_signed_cert(CertificateSpec(domains=domains), + valid_from=timedelta(days=-70), + valid_to=timedelta(days=20), + serial=920011) cert_file = os.path.join(testpath, 'pubcert.pem') pkey_file = os.path.join(testpath, 'privkey.pem') - assert os.path.exists(cert_file) - assert os.path.exists(pkey_file) + creds.save_cert_pem(cert_file) + creds.save_pkey_pem(pkey_file) conf = MDConf(env, std_vhosts=False, std_ports=False, text=f""" MDBaseServer on MDPortMap http:- https:{env.https_port} diff --git a/test/pyhttpd/certs.py b/test/pyhttpd/certs.py index 5519f16..a08d5e6 100644 --- a/test/pyhttpd/certs.py +++ b/test/pyhttpd/certs.py @@ -181,6 +181,14 @@ def issue_cert(self, spec: CertificateSpec, chain: List['Credentials'] = None) - creds.issue_certs(spec.sub_specs, chain=subchain) return creds + def save_cert_pem(self, fpath): + with open(fpath, "wb") as fd: + fd.write(self.cert_pem) + + def save_pkey_pem(self, fpath): + with open(fpath, "wb") as fd: + fd.write(self.pkey_pem) + class CertStore: @@ -282,6 +290,7 @@ def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Cr def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any, valid_from: timedelta = timedelta(days=-1), valid_to: timedelta = timedelta(days=89), + serial: Optional[int] = None, ) -> Credentials: """Create a certificate signed by this CA for the given domains. :returns: the certificate and private key PEM file paths @@ -289,15 +298,18 @@ def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any if spec.domains and len(spec.domains): creds = HttpdTestCA._make_server_credentials(name=spec.name, domains=spec.domains, issuer=issuer, valid_from=valid_from, - valid_to=valid_to, key_type=key_type) + valid_to=valid_to, key_type=key_type, + serial=serial) elif spec.client: creds = HttpdTestCA._make_client_credentials(name=spec.name, issuer=issuer, email=spec.email, valid_from=valid_from, - valid_to=valid_to, key_type=key_type) + valid_to=valid_to, key_type=key_type, + serial=serial) elif spec.name: creds = HttpdTestCA._make_ca_credentials(name=spec.name, issuer=issuer, valid_from=valid_from, valid_to=valid_to, - key_type=key_type) + key_type=key_type, + serial=serial) else: raise Exception(f"unrecognized certificate specification: {spec}") return creds @@ -320,7 +332,8 @@ def _make_csr( pkey: Any, issuer_subject: Optional[Credentials], valid_from_delta: timedelta = None, - valid_until_delta: timedelta = None + valid_until_delta: timedelta = None, + serial: Optional[int] = None ): pubkey = pkey.public_key() issuer_subject = issuer_subject if issuer_subject is not None else subject @@ -331,7 +344,8 @@ def _make_csr( valid_until = datetime.now() if valid_until_delta is not None: valid_until += valid_until_delta - + if serial is None: + serial = x509.random_serial_number() return ( x509.CertificateBuilder() .subject_name(subject) @@ -339,7 +353,7 @@ def _make_csr( .public_key(pubkey) .not_valid_before(valid_from) .not_valid_after(valid_until) - .serial_number(x509.random_serial_number()) + .serial_number(serial) .add_extension( x509.SubjectKeyIdentifier.from_public_key(pubkey), critical=False, @@ -374,23 +388,28 @@ def _add_ca_usages(csr: Any) -> Any: @staticmethod def _add_leaf_usages(csr: Any, domains: List[str], issuer: Credentials) -> Any: - return csr.add_extension( + csr = csr.add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, - ).add_extension( - x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - issuer.certificate.extensions.get_extension_for_class( - x509.SubjectKeyIdentifier).value), - critical=False - ).add_extension( + ) + if issuer is not None: + csr = csr.add_extension( + x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + issuer.certificate.extensions.get_extension_for_class( + x509.SubjectKeyIdentifier).value), + critical=False + ) + csr = csr.add_extension( x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains]), critical=True, - ).add_extension( + ) + csr = csr.add_extension( x509.ExtendedKeyUsage([ ExtendedKeyUsageOID.SERVER_AUTH, ]), critical=True ) + return csr @staticmethod def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: str = None) -> Any: @@ -421,6 +440,7 @@ def _make_ca_credentials(name, key_type: Any, issuer: Credentials = None, valid_from: timedelta = timedelta(days=-1), valid_to: timedelta = timedelta(days=89), + serial: Optional[int] = None, ) -> Credentials: pkey = _private_key(key_type=key_type) if issuer is not None: @@ -432,7 +452,8 @@ def _make_ca_credentials(name, key_type: Any, subject = HttpdTestCA._make_x509_name(org_name=name, parent=issuer.subject if issuer else None) csr = HttpdTestCA._make_csr(subject=subject, issuer_subject=issuer_subject, pkey=pkey, - valid_from_delta=valid_from, valid_until_delta=valid_to) + valid_from_delta=valid_from, valid_until_delta=valid_to, + serial=serial) csr = HttpdTestCA._add_ca_usages(csr) cert = csr.sign(private_key=issuer_key, algorithm=hashes.SHA256(), @@ -444,15 +465,23 @@ def _make_server_credentials(name: str, domains: List[str], issuer: Credentials, key_type: Any, valid_from: timedelta = timedelta(days=-1), valid_to: timedelta = timedelta(days=89), + serial: Optional[int] = None, ) -> Credentials: name = name pkey = _private_key(key_type=key_type) - subject = HttpdTestCA._make_x509_name(common_name=name, parent=issuer.subject) + if issuer is not None: + issuer_subject = issuer.certificate.subject + issuer_key = issuer.private_key + else: + issuer_subject = None + issuer_key = pkey + subject = HttpdTestCA._make_x509_name(common_name=name, parent=issuer_subject) csr = HttpdTestCA._make_csr(subject=subject, - issuer_subject=issuer.certificate.subject, pkey=pkey, - valid_from_delta=valid_from, valid_until_delta=valid_to) + issuer_subject=issuer_subject, pkey=pkey, + valid_from_delta=valid_from, valid_until_delta=valid_to, + serial=serial) csr = HttpdTestCA._add_leaf_usages(csr, domains=domains, issuer=issuer) - cert = csr.sign(private_key=issuer.private_key, + cert = csr.sign(private_key=issuer_key, algorithm=hashes.SHA256(), backend=default_backend()) return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer) @@ -463,14 +492,22 @@ def _make_client_credentials(name: str, key_type: Any, valid_from: timedelta = timedelta(days=-1), valid_to: timedelta = timedelta(days=89), + serial: Optional[int] = None, ) -> Credentials: pkey = _private_key(key_type=key_type) - subject = HttpdTestCA._make_x509_name(common_name=name, parent=issuer.subject) + if issuer is not None: + issuer_subject = issuer.certificate.subject + issuer_key = issuer.private_key + else: + issuer_subject = None + issuer_key = pkey + subject = HttpdTestCA._make_x509_name(common_name=name, parent=issuer_subject) csr = HttpdTestCA._make_csr(subject=subject, - issuer_subject=issuer.certificate.subject, pkey=pkey, - valid_from_delta=valid_from, valid_until_delta=valid_to) + issuer_subject=issuer_subject, pkey=pkey, + valid_from_delta=valid_from, valid_until_delta=valid_to, + serial=serial) csr = HttpdTestCA._add_client_usages(csr, issuer=issuer, rfc82name=email) - cert = csr.sign(private_key=issuer.private_key, + cert = csr.sign(private_key=issuer_key, algorithm=hashes.SHA256(), backend=default_backend()) return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)