1# This file is dual licensed under the terms of the Apache License, Version 
    2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 
    3# for complete details. 
    4 
    5from __future__ import annotations 
    6 
    7import abc 
    8import datetime 
    9import os 
    10import typing 
    11import warnings 
    12from collections.abc import Iterable 
    13 
    14from cryptography import utils 
    15from cryptography.hazmat.bindings._rust import x509 as rust_x509 
    16from cryptography.hazmat.primitives import hashes 
    17from cryptography.hazmat.primitives.asymmetric import ( 
    18    dsa, 
    19    ec, 
    20    ed448, 
    21    ed25519, 
    22    padding, 
    23    rsa, 
    24    x448, 
    25    x25519, 
    26) 
    27from cryptography.hazmat.primitives.asymmetric.types import ( 
    28    CertificateIssuerPrivateKeyTypes, 
    29    CertificatePublicKeyTypes, 
    30) 
    31from cryptography.x509.extensions import ( 
    32    Extension, 
    33    Extensions, 
    34    ExtensionType, 
    35    _make_sequence_methods, 
    36) 
    37from cryptography.x509.name import Name, _ASN1Type 
    38from cryptography.x509.oid import ObjectIdentifier 
    39 
    40_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) 
    41 
    42# This must be kept in sync with sign.rs's list of allowable types in 
    43# identify_hash_type 
    44_AllowedHashTypes = typing.Union[ 
    45    hashes.SHA224, 
    46    hashes.SHA256, 
    47    hashes.SHA384, 
    48    hashes.SHA512, 
    49    hashes.SHA3_224, 
    50    hashes.SHA3_256, 
    51    hashes.SHA3_384, 
    52    hashes.SHA3_512, 
    53] 
    54 
    55 
    56class AttributeNotFound(Exception): 
    57    def __init__(self, msg: str, oid: ObjectIdentifier) -> None: 
    58        super().__init__(msg) 
    59        self.oid = oid 
    60 
    61 
    62def _reject_duplicate_extension( 
    63    extension: Extension[ExtensionType], 
    64    extensions: list[Extension[ExtensionType]], 
    65) -> None: 
    66    # This is quadratic in the number of extensions 
    67    for e in extensions: 
    68        if e.oid == extension.oid: 
    69            raise ValueError("This extension has already been set.") 
    70 
    71 
    72def _reject_duplicate_attribute( 
    73    oid: ObjectIdentifier, 
    74    attributes: list[tuple[ObjectIdentifier, bytes, int | None]], 
    75) -> None: 
    76    # This is quadratic in the number of attributes 
    77    for attr_oid, _, _ in attributes: 
    78        if attr_oid == oid: 
    79            raise ValueError("This attribute has already been set.") 
    80 
    81 
    82def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: 
    83    """Normalizes a datetime to a naive datetime in UTC. 
    84 
    85    time -- datetime to normalize. Assumed to be in UTC if not timezone 
    86            aware. 
    87    """ 
    88    if time.tzinfo is not None: 
    89        offset = time.utcoffset() 
    90        offset = offset if offset else datetime.timedelta() 
    91        return time.replace(tzinfo=None) - offset 
    92    else: 
    93        return time 
    94 
    95 
    96class Attribute: 
    97    def __init__( 
    98        self, 
    99        oid: ObjectIdentifier, 
    100        value: bytes, 
    101        _type: int = _ASN1Type.UTF8String.value, 
    102    ) -> None: 
    103        self._oid = oid 
    104        self._value = value 
    105        self._type = _type 
    106 
    107    @property 
    108    def oid(self) -> ObjectIdentifier: 
    109        return self._oid 
    110 
    111    @property 
    112    def value(self) -> bytes: 
    113        return self._value 
    114 
    115    def __repr__(self) -> str: 
    116        return f"<Attribute(oid={self.oid}, value={self.value!r})>" 
    117 
    118    def __eq__(self, other: object) -> bool: 
    119        if not isinstance(other, Attribute): 
    120            return NotImplemented 
    121 
    122        return ( 
    123            self.oid == other.oid 
    124            and self.value == other.value 
    125            and self._type == other._type 
    126        ) 
    127 
    128    def __hash__(self) -> int: 
    129        return hash((self.oid, self.value, self._type)) 
    130 
    131 
    132class Attributes: 
    133    def __init__( 
    134        self, 
    135        attributes: Iterable[Attribute], 
    136    ) -> None: 
    137        self._attributes = list(attributes) 
    138 
    139    __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") 
    140 
    141    def __repr__(self) -> str: 
    142        return f"<Attributes({self._attributes})>" 
    143 
    144    def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: 
    145        for attr in self: 
    146            if attr.oid == oid: 
    147                return attr 
    148 
    149        raise AttributeNotFound(f"No {oid} attribute was found", oid) 
    150 
    151 
    152class Version(utils.Enum): 
    153    v1 = 0 
    154    v3 = 2 
    155 
    156 
    157class InvalidVersion(Exception): 
    158    def __init__(self, msg: str, parsed_version: int) -> None: 
    159        super().__init__(msg) 
    160        self.parsed_version = parsed_version 
    161 
    162 
    163Certificate = rust_x509.Certificate 
    164 
    165 
    166class RevokedCertificate(metaclass=abc.ABCMeta): 
    167    @property 
    168    @abc.abstractmethod 
    169    def serial_number(self) -> int: 
    170        """ 
    171        Returns the serial number of the revoked certificate. 
    172        """ 
    173 
    174    @property 
    175    @abc.abstractmethod 
    176    def revocation_date(self) -> datetime.datetime: 
    177        """ 
    178        Returns the date of when this certificate was revoked. 
    179        """ 
    180 
    181    @property 
    182    @abc.abstractmethod 
    183    def revocation_date_utc(self) -> datetime.datetime: 
    184        """ 
    185        Returns the date of when this certificate was revoked as a non-naive 
    186        UTC datetime. 
    187        """ 
    188 
    189    @property 
    190    @abc.abstractmethod 
    191    def extensions(self) -> Extensions: 
    192        """ 
    193        Returns an Extensions object containing a list of Revoked extensions. 
    194        """ 
    195 
    196 
    197# Runtime isinstance checks need this since the rust class is not a subclass. 
    198RevokedCertificate.register(rust_x509.RevokedCertificate) 
    199 
    200 
    201class _RawRevokedCertificate(RevokedCertificate): 
    202    def __init__( 
    203        self, 
    204        serial_number: int, 
    205        revocation_date: datetime.datetime, 
    206        extensions: Extensions, 
    207    ): 
    208        self._serial_number = serial_number 
    209        self._revocation_date = revocation_date 
    210        self._extensions = extensions 
    211 
    212    @property 
    213    def serial_number(self) -> int: 
    214        return self._serial_number 
    215 
    216    @property 
    217    def revocation_date(self) -> datetime.datetime: 
    218        warnings.warn( 
    219            "Properties that return a naïve datetime object have been " 
    220            "deprecated. Please switch to revocation_date_utc.", 
    221            utils.DeprecatedIn42, 
    222            stacklevel=2, 
    223        ) 
    224        return self._revocation_date 
    225 
    226    @property 
    227    def revocation_date_utc(self) -> datetime.datetime: 
    228        return self._revocation_date.replace(tzinfo=datetime.timezone.utc) 
    229 
    230    @property 
    231    def extensions(self) -> Extensions: 
    232        return self._extensions 
    233 
    234 
    235CertificateRevocationList = rust_x509.CertificateRevocationList 
    236CertificateSigningRequest = rust_x509.CertificateSigningRequest 
    237 
    238 
    239load_pem_x509_certificate = rust_x509.load_pem_x509_certificate 
    240load_der_x509_certificate = rust_x509.load_der_x509_certificate 
    241 
    242load_pem_x509_certificates = rust_x509.load_pem_x509_certificates 
    243 
    244load_pem_x509_csr = rust_x509.load_pem_x509_csr 
    245load_der_x509_csr = rust_x509.load_der_x509_csr 
    246 
    247load_pem_x509_crl = rust_x509.load_pem_x509_crl 
    248load_der_x509_crl = rust_x509.load_der_x509_crl 
    249 
    250 
    251class CertificateSigningRequestBuilder: 
    252    def __init__( 
    253        self, 
    254        subject_name: Name | None = None, 
    255        extensions: list[Extension[ExtensionType]] = [], 
    256        attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [], 
    257    ): 
    258        """ 
    259        Creates an empty X.509 certificate request (v1). 
    260        """ 
    261        self._subject_name = subject_name 
    262        self._extensions = extensions 
    263        self._attributes = attributes 
    264 
    265    def subject_name(self, name: Name) -> CertificateSigningRequestBuilder: 
    266        """ 
    267        Sets the certificate requestor's distinguished name. 
    268        """ 
    269        if not isinstance(name, Name): 
    270            raise TypeError("Expecting x509.Name object.") 
    271        if self._subject_name is not None: 
    272            raise ValueError("The subject name may only be set once.") 
    273        return CertificateSigningRequestBuilder( 
    274            name, self._extensions, self._attributes 
    275        ) 
    276 
    277    def add_extension( 
    278        self, extval: ExtensionType, critical: bool 
    279    ) -> CertificateSigningRequestBuilder: 
    280        """ 
    281        Adds an X.509 extension to the certificate request. 
    282        """ 
    283        if not isinstance(extval, ExtensionType): 
    284            raise TypeError("extension must be an ExtensionType") 
    285 
    286        extension = Extension(extval.oid, critical, extval) 
    287        _reject_duplicate_extension(extension, self._extensions) 
    288 
    289        return CertificateSigningRequestBuilder( 
    290            self._subject_name, 
    291            [*self._extensions, extension], 
    292            self._attributes, 
    293        ) 
    294 
    295    def add_attribute( 
    296        self, 
    297        oid: ObjectIdentifier, 
    298        value: bytes, 
    299        *, 
    300        _tag: _ASN1Type | None = None, 
    301    ) -> CertificateSigningRequestBuilder: 
    302        """ 
    303        Adds an X.509 attribute with an OID and associated value. 
    304        """ 
    305        if not isinstance(oid, ObjectIdentifier): 
    306            raise TypeError("oid must be an ObjectIdentifier") 
    307 
    308        if not isinstance(value, bytes): 
    309            raise TypeError("value must be bytes") 
    310 
    311        if _tag is not None and not isinstance(_tag, _ASN1Type): 
    312            raise TypeError("tag must be _ASN1Type") 
    313 
    314        _reject_duplicate_attribute(oid, self._attributes) 
    315 
    316        if _tag is not None: 
    317            tag = _tag.value 
    318        else: 
    319            tag = None 
    320 
    321        return CertificateSigningRequestBuilder( 
    322            self._subject_name, 
    323            self._extensions, 
    324            [*self._attributes, (oid, value, tag)], 
    325        ) 
    326 
    327    def sign( 
    328        self, 
    329        private_key: CertificateIssuerPrivateKeyTypes, 
    330        algorithm: _AllowedHashTypes | None, 
    331        backend: typing.Any = None, 
    332        *, 
    333        rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 
    334        ecdsa_deterministic: bool | None = None, 
    335    ) -> CertificateSigningRequest: 
    336        """ 
    337        Signs the request using the requestor's private key. 
    338        """ 
    339        if self._subject_name is None: 
    340            raise ValueError("A CertificateSigningRequest must have a subject") 
    341 
    342        if rsa_padding is not None: 
    343            if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 
    344                raise TypeError("Padding must be PSS or PKCS1v15") 
    345            if not isinstance(private_key, rsa.RSAPrivateKey): 
    346                raise TypeError("Padding is only supported for RSA keys") 
    347 
    348        if ecdsa_deterministic is not None: 
    349            if not isinstance(private_key, ec.EllipticCurvePrivateKey): 
    350                raise TypeError( 
    351                    "Deterministic ECDSA is only supported for EC keys" 
    352                ) 
    353 
    354        return rust_x509.create_x509_csr( 
    355            self, 
    356            private_key, 
    357            algorithm, 
    358            rsa_padding, 
    359            ecdsa_deterministic, 
    360        ) 
    361 
    362 
    363class CertificateBuilder: 
    364    _extensions: list[Extension[ExtensionType]] 
    365 
    366    def __init__( 
    367        self, 
    368        issuer_name: Name | None = None, 
    369        subject_name: Name | None = None, 
    370        public_key: CertificatePublicKeyTypes | None = None, 
    371        serial_number: int | None = None, 
    372        not_valid_before: datetime.datetime | None = None, 
    373        not_valid_after: datetime.datetime | None = None, 
    374        extensions: list[Extension[ExtensionType]] = [], 
    375    ) -> None: 
    376        self._version = Version.v3 
    377        self._issuer_name = issuer_name 
    378        self._subject_name = subject_name 
    379        self._public_key = public_key 
    380        self._serial_number = serial_number 
    381        self._not_valid_before = not_valid_before 
    382        self._not_valid_after = not_valid_after 
    383        self._extensions = extensions 
    384 
    385    def issuer_name(self, name: Name) -> CertificateBuilder: 
    386        """ 
    387        Sets the CA's distinguished name. 
    388        """ 
    389        if not isinstance(name, Name): 
    390            raise TypeError("Expecting x509.Name object.") 
    391        if self._issuer_name is not None: 
    392            raise ValueError("The issuer name may only be set once.") 
    393        return CertificateBuilder( 
    394            name, 
    395            self._subject_name, 
    396            self._public_key, 
    397            self._serial_number, 
    398            self._not_valid_before, 
    399            self._not_valid_after, 
    400            self._extensions, 
    401        ) 
    402 
    403    def subject_name(self, name: Name) -> CertificateBuilder: 
    404        """ 
    405        Sets the requestor's distinguished name. 
    406        """ 
    407        if not isinstance(name, Name): 
    408            raise TypeError("Expecting x509.Name object.") 
    409        if self._subject_name is not None: 
    410            raise ValueError("The subject name may only be set once.") 
    411        return CertificateBuilder( 
    412            self._issuer_name, 
    413            name, 
    414            self._public_key, 
    415            self._serial_number, 
    416            self._not_valid_before, 
    417            self._not_valid_after, 
    418            self._extensions, 
    419        ) 
    420 
    421    def public_key( 
    422        self, 
    423        key: CertificatePublicKeyTypes, 
    424    ) -> CertificateBuilder: 
    425        """ 
    426        Sets the requestor's public key (as found in the signing request). 
    427        """ 
    428        if not isinstance( 
    429            key, 
    430            ( 
    431                dsa.DSAPublicKey, 
    432                rsa.RSAPublicKey, 
    433                ec.EllipticCurvePublicKey, 
    434                ed25519.Ed25519PublicKey, 
    435                ed448.Ed448PublicKey, 
    436                x25519.X25519PublicKey, 
    437                x448.X448PublicKey, 
    438            ), 
    439        ): 
    440            raise TypeError( 
    441                "Expecting one of DSAPublicKey, RSAPublicKey," 
    442                " EllipticCurvePublicKey, Ed25519PublicKey," 
    443                " Ed448PublicKey, X25519PublicKey, or " 
    444                "X448PublicKey." 
    445            ) 
    446        if self._public_key is not None: 
    447            raise ValueError("The public key may only be set once.") 
    448        return CertificateBuilder( 
    449            self._issuer_name, 
    450            self._subject_name, 
    451            key, 
    452            self._serial_number, 
    453            self._not_valid_before, 
    454            self._not_valid_after, 
    455            self._extensions, 
    456        ) 
    457 
    458    def serial_number(self, number: int) -> CertificateBuilder: 
    459        """ 
    460        Sets the certificate serial number. 
    461        """ 
    462        if not isinstance(number, int): 
    463            raise TypeError("Serial number must be of integral type.") 
    464        if self._serial_number is not None: 
    465            raise ValueError("The serial number may only be set once.") 
    466        if number <= 0: 
    467            raise ValueError("The serial number should be positive.") 
    468 
    469        # ASN.1 integers are always signed, so most significant bit must be 
    470        # zero. 
    471        if number.bit_length() >= 160:  # As defined in RFC 5280 
    472            raise ValueError( 
    473                "The serial number should not be more than 159 bits." 
    474            ) 
    475        return CertificateBuilder( 
    476            self._issuer_name, 
    477            self._subject_name, 
    478            self._public_key, 
    479            number, 
    480            self._not_valid_before, 
    481            self._not_valid_after, 
    482            self._extensions, 
    483        ) 
    484 
    485    def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder: 
    486        """ 
    487        Sets the certificate activation time. 
    488        """ 
    489        if not isinstance(time, datetime.datetime): 
    490            raise TypeError("Expecting datetime object.") 
    491        if self._not_valid_before is not None: 
    492            raise ValueError("The not valid before may only be set once.") 
    493        time = _convert_to_naive_utc_time(time) 
    494        if time < _EARLIEST_UTC_TIME: 
    495            raise ValueError( 
    496                "The not valid before date must be on or after" 
    497                " 1950 January 1)." 
    498            ) 
    499        if self._not_valid_after is not None and time > self._not_valid_after: 
    500            raise ValueError( 
    501                "The not valid before date must be before the not valid after " 
    502                "date." 
    503            ) 
    504        return CertificateBuilder( 
    505            self._issuer_name, 
    506            self._subject_name, 
    507            self._public_key, 
    508            self._serial_number, 
    509            time, 
    510            self._not_valid_after, 
    511            self._extensions, 
    512        ) 
    513 
    514    def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder: 
    515        """ 
    516        Sets the certificate expiration time. 
    517        """ 
    518        if not isinstance(time, datetime.datetime): 
    519            raise TypeError("Expecting datetime object.") 
    520        if self._not_valid_after is not None: 
    521            raise ValueError("The not valid after may only be set once.") 
    522        time = _convert_to_naive_utc_time(time) 
    523        if time < _EARLIEST_UTC_TIME: 
    524            raise ValueError( 
    525                "The not valid after date must be on or after 1950 January 1." 
    526            ) 
    527        if ( 
    528            self._not_valid_before is not None 
    529            and time < self._not_valid_before 
    530        ): 
    531            raise ValueError( 
    532                "The not valid after date must be after the not valid before " 
    533                "date." 
    534            ) 
    535        return CertificateBuilder( 
    536            self._issuer_name, 
    537            self._subject_name, 
    538            self._public_key, 
    539            self._serial_number, 
    540            self._not_valid_before, 
    541            time, 
    542            self._extensions, 
    543        ) 
    544 
    545    def add_extension( 
    546        self, extval: ExtensionType, critical: bool 
    547    ) -> CertificateBuilder: 
    548        """ 
    549        Adds an X.509 extension to the certificate. 
    550        """ 
    551        if not isinstance(extval, ExtensionType): 
    552            raise TypeError("extension must be an ExtensionType") 
    553 
    554        extension = Extension(extval.oid, critical, extval) 
    555        _reject_duplicate_extension(extension, self._extensions) 
    556 
    557        return CertificateBuilder( 
    558            self._issuer_name, 
    559            self._subject_name, 
    560            self._public_key, 
    561            self._serial_number, 
    562            self._not_valid_before, 
    563            self._not_valid_after, 
    564            [*self._extensions, extension], 
    565        ) 
    566 
    567    def sign( 
    568        self, 
    569        private_key: CertificateIssuerPrivateKeyTypes, 
    570        algorithm: _AllowedHashTypes | None, 
    571        backend: typing.Any = None, 
    572        *, 
    573        rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 
    574        ecdsa_deterministic: bool | None = None, 
    575    ) -> Certificate: 
    576        """ 
    577        Signs the certificate using the CA's private key. 
    578        """ 
    579        if self._subject_name is None: 
    580            raise ValueError("A certificate must have a subject name") 
    581 
    582        if self._issuer_name is None: 
    583            raise ValueError("A certificate must have an issuer name") 
    584 
    585        if self._serial_number is None: 
    586            raise ValueError("A certificate must have a serial number") 
    587 
    588        if self._not_valid_before is None: 
    589            raise ValueError("A certificate must have a not valid before time") 
    590 
    591        if self._not_valid_after is None: 
    592            raise ValueError("A certificate must have a not valid after time") 
    593 
    594        if self._public_key is None: 
    595            raise ValueError("A certificate must have a public key") 
    596 
    597        if rsa_padding is not None: 
    598            if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 
    599                raise TypeError("Padding must be PSS or PKCS1v15") 
    600            if not isinstance(private_key, rsa.RSAPrivateKey): 
    601                raise TypeError("Padding is only supported for RSA keys") 
    602 
    603        if ecdsa_deterministic is not None: 
    604            if not isinstance(private_key, ec.EllipticCurvePrivateKey): 
    605                raise TypeError( 
    606                    "Deterministic ECDSA is only supported for EC keys" 
    607                ) 
    608 
    609        return rust_x509.create_x509_certificate( 
    610            self, 
    611            private_key, 
    612            algorithm, 
    613            rsa_padding, 
    614            ecdsa_deterministic, 
    615        ) 
    616 
    617 
    618class CertificateRevocationListBuilder: 
    619    _extensions: list[Extension[ExtensionType]] 
    620    _revoked_certificates: list[RevokedCertificate] 
    621 
    622    def __init__( 
    623        self, 
    624        issuer_name: Name | None = None, 
    625        last_update: datetime.datetime | None = None, 
    626        next_update: datetime.datetime | None = None, 
    627        extensions: list[Extension[ExtensionType]] = [], 
    628        revoked_certificates: list[RevokedCertificate] = [], 
    629    ): 
    630        self._issuer_name = issuer_name 
    631        self._last_update = last_update 
    632        self._next_update = next_update 
    633        self._extensions = extensions 
    634        self._revoked_certificates = revoked_certificates 
    635 
    636    def issuer_name( 
    637        self, issuer_name: Name 
    638    ) -> CertificateRevocationListBuilder: 
    639        if not isinstance(issuer_name, Name): 
    640            raise TypeError("Expecting x509.Name object.") 
    641        if self._issuer_name is not None: 
    642            raise ValueError("The issuer name may only be set once.") 
    643        return CertificateRevocationListBuilder( 
    644            issuer_name, 
    645            self._last_update, 
    646            self._next_update, 
    647            self._extensions, 
    648            self._revoked_certificates, 
    649        ) 
    650 
    651    def last_update( 
    652        self, last_update: datetime.datetime 
    653    ) -> CertificateRevocationListBuilder: 
    654        if not isinstance(last_update, datetime.datetime): 
    655            raise TypeError("Expecting datetime object.") 
    656        if self._last_update is not None: 
    657            raise ValueError("Last update may only be set once.") 
    658        last_update = _convert_to_naive_utc_time(last_update) 
    659        if last_update < _EARLIEST_UTC_TIME: 
    660            raise ValueError( 
    661                "The last update date must be on or after 1950 January 1." 
    662            ) 
    663        if self._next_update is not None and last_update > self._next_update: 
    664            raise ValueError( 
    665                "The last update date must be before the next update date." 
    666            ) 
    667        return CertificateRevocationListBuilder( 
    668            self._issuer_name, 
    669            last_update, 
    670            self._next_update, 
    671            self._extensions, 
    672            self._revoked_certificates, 
    673        ) 
    674 
    675    def next_update( 
    676        self, next_update: datetime.datetime 
    677    ) -> CertificateRevocationListBuilder: 
    678        if not isinstance(next_update, datetime.datetime): 
    679            raise TypeError("Expecting datetime object.") 
    680        if self._next_update is not None: 
    681            raise ValueError("Last update may only be set once.") 
    682        next_update = _convert_to_naive_utc_time(next_update) 
    683        if next_update < _EARLIEST_UTC_TIME: 
    684            raise ValueError( 
    685                "The last update date must be on or after 1950 January 1." 
    686            ) 
    687        if self._last_update is not None and next_update < self._last_update: 
    688            raise ValueError( 
    689                "The next update date must be after the last update date." 
    690            ) 
    691        return CertificateRevocationListBuilder( 
    692            self._issuer_name, 
    693            self._last_update, 
    694            next_update, 
    695            self._extensions, 
    696            self._revoked_certificates, 
    697        ) 
    698 
    699    def add_extension( 
    700        self, extval: ExtensionType, critical: bool 
    701    ) -> CertificateRevocationListBuilder: 
    702        """ 
    703        Adds an X.509 extension to the certificate revocation list. 
    704        """ 
    705        if not isinstance(extval, ExtensionType): 
    706            raise TypeError("extension must be an ExtensionType") 
    707 
    708        extension = Extension(extval.oid, critical, extval) 
    709        _reject_duplicate_extension(extension, self._extensions) 
    710        return CertificateRevocationListBuilder( 
    711            self._issuer_name, 
    712            self._last_update, 
    713            self._next_update, 
    714            [*self._extensions, extension], 
    715            self._revoked_certificates, 
    716        ) 
    717 
    718    def add_revoked_certificate( 
    719        self, revoked_certificate: RevokedCertificate 
    720    ) -> CertificateRevocationListBuilder: 
    721        """ 
    722        Adds a revoked certificate to the CRL. 
    723        """ 
    724        if not isinstance(revoked_certificate, RevokedCertificate): 
    725            raise TypeError("Must be an instance of RevokedCertificate") 
    726 
    727        return CertificateRevocationListBuilder( 
    728            self._issuer_name, 
    729            self._last_update, 
    730            self._next_update, 
    731            self._extensions, 
    732            [*self._revoked_certificates, revoked_certificate], 
    733        ) 
    734 
    735    def sign( 
    736        self, 
    737        private_key: CertificateIssuerPrivateKeyTypes, 
    738        algorithm: _AllowedHashTypes | None, 
    739        backend: typing.Any = None, 
    740        *, 
    741        rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, 
    742        ecdsa_deterministic: bool | None = None, 
    743    ) -> CertificateRevocationList: 
    744        if self._issuer_name is None: 
    745            raise ValueError("A CRL must have an issuer name") 
    746 
    747        if self._last_update is None: 
    748            raise ValueError("A CRL must have a last update time") 
    749 
    750        if self._next_update is None: 
    751            raise ValueError("A CRL must have a next update time") 
    752 
    753        if rsa_padding is not None: 
    754            if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): 
    755                raise TypeError("Padding must be PSS or PKCS1v15") 
    756            if not isinstance(private_key, rsa.RSAPrivateKey): 
    757                raise TypeError("Padding is only supported for RSA keys") 
    758 
    759        if ecdsa_deterministic is not None: 
    760            if not isinstance(private_key, ec.EllipticCurvePrivateKey): 
    761                raise TypeError( 
    762                    "Deterministic ECDSA is only supported for EC keys" 
    763                ) 
    764 
    765        return rust_x509.create_x509_crl( 
    766            self, 
    767            private_key, 
    768            algorithm, 
    769            rsa_padding, 
    770            ecdsa_deterministic, 
    771        ) 
    772 
    773 
    774class RevokedCertificateBuilder: 
    775    def __init__( 
    776        self, 
    777        serial_number: int | None = None, 
    778        revocation_date: datetime.datetime | None = None, 
    779        extensions: list[Extension[ExtensionType]] = [], 
    780    ): 
    781        self._serial_number = serial_number 
    782        self._revocation_date = revocation_date 
    783        self._extensions = extensions 
    784 
    785    def serial_number(self, number: int) -> RevokedCertificateBuilder: 
    786        if not isinstance(number, int): 
    787            raise TypeError("Serial number must be of integral type.") 
    788        if self._serial_number is not None: 
    789            raise ValueError("The serial number may only be set once.") 
    790        if number <= 0: 
    791            raise ValueError("The serial number should be positive") 
    792 
    793        # ASN.1 integers are always signed, so most significant bit must be 
    794        # zero. 
    795        if number.bit_length() >= 160:  # As defined in RFC 5280 
    796            raise ValueError( 
    797                "The serial number should not be more than 159 bits." 
    798            ) 
    799        return RevokedCertificateBuilder( 
    800            number, self._revocation_date, self._extensions 
    801        ) 
    802 
    803    def revocation_date( 
    804        self, time: datetime.datetime 
    805    ) -> RevokedCertificateBuilder: 
    806        if not isinstance(time, datetime.datetime): 
    807            raise TypeError("Expecting datetime object.") 
    808        if self._revocation_date is not None: 
    809            raise ValueError("The revocation date may only be set once.") 
    810        time = _convert_to_naive_utc_time(time) 
    811        if time < _EARLIEST_UTC_TIME: 
    812            raise ValueError( 
    813                "The revocation date must be on or after 1950 January 1." 
    814            ) 
    815        return RevokedCertificateBuilder( 
    816            self._serial_number, time, self._extensions 
    817        ) 
    818 
    819    def add_extension( 
    820        self, extval: ExtensionType, critical: bool 
    821    ) -> RevokedCertificateBuilder: 
    822        if not isinstance(extval, ExtensionType): 
    823            raise TypeError("extension must be an ExtensionType") 
    824 
    825        extension = Extension(extval.oid, critical, extval) 
    826        _reject_duplicate_extension(extension, self._extensions) 
    827        return RevokedCertificateBuilder( 
    828            self._serial_number, 
    829            self._revocation_date, 
    830            [*self._extensions, extension], 
    831        ) 
    832 
    833    def build(self, backend: typing.Any = None) -> RevokedCertificate: 
    834        if self._serial_number is None: 
    835            raise ValueError("A revoked certificate must have a serial number") 
    836        if self._revocation_date is None: 
    837            raise ValueError( 
    838                "A revoked certificate must have a revocation date" 
    839            ) 
    840        return _RawRevokedCertificate( 
    841            self._serial_number, 
    842            self._revocation_date, 
    843            Extensions(self._extensions), 
    844        ) 
    845 
    846 
    847def random_serial_number() -> int: 
    848    return int.from_bytes(os.urandom(20), "big") >> 1