1"""Key interface and the default implementations""" 
    2 
    3from __future__ import annotations 
    4 
    5import logging 
    6from abc import ABCMeta, abstractmethod 
    7from typing import Any, cast 
    8 
    9from securesystemslib._vendor.ed25519.ed25519 import ( 
    10    SignatureMismatch, 
    11    checkvalid, 
    12) 
    13from securesystemslib.exceptions import ( 
    14    UnsupportedLibraryError, 
    15    UnverifiedSignatureError, 
    16    VerificationError, 
    17) 
    18from securesystemslib.signer._signature import Signature 
    19from securesystemslib.signer._utils import compute_default_keyid 
    20 
    21CRYPTO_IMPORT_ERROR = None 
    22try: 
    23    from cryptography.exceptions import InvalidSignature 
    24    from cryptography.hazmat.primitives.asymmetric.ec import ( 
    25        ECDSA, 
    26        SECP256R1, 
    27        SECP384R1, 
    28        SECP521R1, 
    29        EllipticCurve, 
    30        EllipticCurvePublicKey, 
    31    ) 
    32    from cryptography.hazmat.primitives.asymmetric.ed25519 import ( 
    33        Ed25519PublicKey, 
    34    ) 
    35    from cryptography.hazmat.primitives.asymmetric.padding import ( 
    36        MGF1, 
    37        PSS, 
    38        PKCS1v15, 
    39    ) 
    40    from cryptography.hazmat.primitives.asymmetric.rsa import ( 
    41        AsymmetricPadding, 
    42        RSAPublicKey, 
    43    ) 
    44    from cryptography.hazmat.primitives.asymmetric.types import PublicKeyTypes 
    45    from cryptography.hazmat.primitives.hashes import ( 
    46        SHA256, 
    47        SHA384, 
    48        SHA512, 
    49        HashAlgorithm, 
    50    ) 
    51    from cryptography.hazmat.primitives.serialization import ( 
    52        Encoding, 
    53        PublicFormat, 
    54        load_pem_public_key, 
    55    ) 
    56 
    57    from securesystemslib.signer._crypto_utils import get_hash_algorithm 
    58 
    59except ImportError: 
    60    CRYPTO_IMPORT_ERROR = "'pyca/cryptography' library required" 
    61 
    62 
    63logger = logging.getLogger(__name__) 
    64 
    65# NOTE Key dispatch table is defined here so it's usable by Key, 
    66# but is populated in __init__.py (and can be appended by users). 
    67KEY_FOR_TYPE_AND_SCHEME: dict[tuple[str, str], type] = {} 
    68"""Key dispatch table for ``Key.from_dict()`` 
    69 
    70See ``securesystemslib.signer.KEY_FOR_TYPE_AND_SCHEME`` for default key types 
    71and schemes, and how to register custom implementations. 
    72""" 
    73 
    74 
    75class Key(metaclass=ABCMeta): 
    76    """Abstract class representing the public portion of a key. 
    77 
    78    *All parameters named below are not just constructor arguments but also 
    79    instance attributes.* 
    80 
    81    Args: 
    82        keyid: Key identifier that is unique within the metadata it is used in. 
    83            Keyid is not verified to be the hash of a specific representation 
    84            of the key. 
    85        keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256". 
    86        scheme: Signature scheme. For example: 
    87            "rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256". 
    88        keyval: Opaque key content 
    89        unrecognized_fields: Dictionary of all attributes that are not managed 
    90            by Securesystemslib 
    91 
    92    Raises: 
    93        TypeError: Invalid type for an argument. 
    94    """ 
    95 
    96    def __init__( 
    97        self, 
    98        keyid: str, 
    99        keytype: str, 
    100        scheme: str, 
    101        keyval: dict[str, Any], 
    102        unrecognized_fields: dict[str, Any] | None = None, 
    103    ): 
    104        if not all( 
    105            isinstance(at, str) for at in [keyid, keytype, scheme] 
    106        ) or not isinstance(keyval, dict): 
    107            raise TypeError("Unexpected Key attributes types!") 
    108        self.keyid = keyid 
    109        self.keytype = keytype 
    110        self.scheme = scheme 
    111        self.keyval = keyval 
    112 
    113        if unrecognized_fields is None: 
    114            unrecognized_fields = {} 
    115 
    116        self.unrecognized_fields = unrecognized_fields 
    117 
    118    def __eq__(self, other: Any) -> bool: 
    119        if not isinstance(other, Key): 
    120            return False 
    121 
    122        return ( 
    123            self.keyid == other.keyid 
    124            and self.keytype == other.keytype 
    125            and self.scheme == other.scheme 
    126            and self.keyval == other.keyval 
    127            and self.unrecognized_fields == other.unrecognized_fields 
    128        ) 
    129 
    130    def __hash__(self) -> int: 
    131        return hash( 
    132            ( 
    133                self.keyid, 
    134                self.keytype, 
    135                self.scheme, 
    136                self.keyval, 
    137                self.unrecognized_fields, 
    138            ) 
    139        ) 
    140 
    141    @classmethod 
    142    @abstractmethod 
    143    def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> Key: 
    144        """Creates ``Key`` object from a serialization dict 
    145 
    146        Key implementations must override this factory constructor that is used 
    147        as a deserialization helper. 
    148 
    149        Users should call ``Key.from_dict()``: it dispatches to the actual 
    150        subclass implementation based on supported keys in 
    151        ``KEY_FOR_TYPE_AND_SCHEME``. 
    152 
    153        Raises: 
    154            KeyError, TypeError: Invalid arguments. 
    155        """ 
    156        keytype = key_dict.get("keytype") 
    157        scheme = key_dict.get("scheme") 
    158        if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: 
    159            raise ValueError(f"Unsupported public key {keytype}/{scheme}") 
    160 
    161        # NOTE: Explicitly not checking the keytype and scheme types to allow 
    162        # intoto to use (None,None) to lookup GPGKey, see issue #450 
    163        key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)]  # type: ignore 
    164        return key_impl.from_dict(keyid, key_dict)  # type: ignore 
    165 
    166    @abstractmethod 
    167    def to_dict(self) -> dict[str, Any]: 
    168        """Returns a serialization dict. 
    169 
    170        Key implementations must override this serialization helper. 
    171        """ 
    172        raise NotImplementedError 
    173 
    174    def _to_dict(self) -> dict[str, Any]: 
    175        """Serialization helper to add base Key fields to a dict. 
    176 
    177        Key implementations may call this in their to_dict, which they must 
    178        still provide, in order to avoid unnoticed serialization accidents. 
    179        """ 
    180        return { 
    181            "keytype": self.keytype, 
    182            "scheme": self.scheme, 
    183            "keyval": self.keyval, 
    184            **self.unrecognized_fields, 
    185        } 
    186 
    187    @staticmethod 
    188    def _from_dict(key_dict: dict[str, Any]) -> tuple[str, str, dict[str, Any]]: 
    189        """Deserialization helper to pop base Key fields off the dict. 
    190 
    191        Key implementations may call this in their from_dict, in order to parse 
    192        out common fields. But they have to create the Key instance themselves. 
    193        """ 
    194        keytype = key_dict.pop("keytype") 
    195        scheme = key_dict.pop("scheme") 
    196        keyval = key_dict.pop("keyval") 
    197 
    198        return keytype, scheme, keyval 
    199 
    200    @abstractmethod 
    201    def verify_signature(self, signature: Signature, data: bytes) -> None: 
    202        """Raises if verification of signature over data fails. 
    203 
    204        Args: 
    205            signature: Signature object. 
    206            data: Payload bytes. 
    207 
    208        Raises: 
    209            UnverifiedSignatureError: Failed to verify signature. 
    210            VerificationError: Signature verification process error. If you 
    211                are only interested in the verify result, just handle 
    212                UnverifiedSignatureError: it contains VerificationError as well 
    213        """ 
    214        raise NotImplementedError 
    215 
    216 
    217class SSlibKey(Key): 
    218    """Key implementation for RSA, Ed25519, ECDSA keys""" 
    219 
    220    def __init__( 
    221        self, 
    222        keyid: str, 
    223        keytype: str, 
    224        scheme: str, 
    225        keyval: dict[str, Any], 
    226        unrecognized_fields: dict[str, Any] | None = None, 
    227    ): 
    228        if "public" not in keyval or not isinstance(keyval["public"], str): 
    229            raise ValueError(f"public key string required for scheme {scheme}") 
    230        super().__init__(keyid, keytype, scheme, keyval, unrecognized_fields) 
    231 
    232    def get_hash_algorithm_name(self) -> str: 
    233        """Get hash algorithm name for scheme. Raise 
    234        ValueError if the scheme is not a supported pre-hash scheme.""" 
    235        if self.scheme in [ 
    236            "rsassa-pss-sha224", 
    237            "rsassa-pss-sha256", 
    238            "rsassa-pss-sha384", 
    239            "rsassa-pss-sha512", 
    240            "rsa-pkcs1v15-sha224", 
    241            "rsa-pkcs1v15-sha256", 
    242            "rsa-pkcs1v15-sha384", 
    243            "rsa-pkcs1v15-sha512", 
    244            "ecdsa-sha2-nistp256", 
    245            "ecdsa-sha2-nistp384", 
    246        ]: 
    247            return f"sha{self.scheme[-3:]}" 
    248 
    249        elif self.scheme == "ecdsa-sha2-nistp521": 
    250            return "sha512" 
    251 
    252        raise ValueError(f"method not supported for scheme {self.scheme}") 
    253 
    254    def get_padding_name(self) -> str: 
    255        """Get padding name for scheme. Raise 
    256        ValueError if the scheme is not a supported padded rsa scheme.""" 
    257        if self.scheme in [ 
    258            "rsassa-pss-sha224", 
    259            "rsassa-pss-sha256", 
    260            "rsassa-pss-sha384", 
    261            "rsassa-pss-sha512", 
    262            "rsa-pkcs1v15-sha224", 
    263            "rsa-pkcs1v15-sha256", 
    264            "rsa-pkcs1v15-sha384", 
    265            "rsa-pkcs1v15-sha512", 
    266        ]: 
    267            return self.scheme.split("-")[1] 
    268 
    269        raise ValueError(f"method not supported for scheme {self.scheme}") 
    270 
    271    @classmethod 
    272    def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> SSlibKey: 
    273        keytype, scheme, keyval = cls._from_dict(key_dict) 
    274 
    275        # All fields left in the key_dict are unrecognized. 
    276        return cls(keyid, keytype, scheme, keyval, key_dict) 
    277 
    278    def to_dict(self) -> dict[str, Any]: 
    279        return self._to_dict() 
    280 
    281    def _crypto_key(self) -> PublicKeyTypes: 
    282        """Helper to get a `cryptography` public key for this SSlibKey.""" 
    283        public_bytes = self.keyval["public"].encode("utf-8") 
    284        return load_pem_public_key(public_bytes) 
    285 
    286    @staticmethod 
    287    def _from_crypto(public_key: PublicKeyTypes) -> tuple[str, str, str]: 
    288        """Return tuple of keytype, default scheme and serialized public key 
    289        value for the passed public key. 
    290 
    291        Raise ValueError if public key is not supported. 
    292        """ 
    293 
    294        def _raw() -> str: 
    295            return public_key.public_bytes( 
    296                encoding=Encoding.Raw, format=PublicFormat.Raw 
    297            ).hex() 
    298 
    299        def _pem() -> str: 
    300            return public_key.public_bytes( 
    301                encoding=Encoding.PEM, format=PublicFormat.SubjectPublicKeyInfo 
    302            ).decode() 
    303 
    304        if isinstance(public_key, RSAPublicKey): 
    305            return "rsa", "rsassa-pss-sha256", _pem() 
    306 
    307        if isinstance(public_key, EllipticCurvePublicKey): 
    308            if isinstance(public_key.curve, SECP256R1): 
    309                return "ecdsa", "ecdsa-sha2-nistp256", _pem() 
    310 
    311            if isinstance(public_key.curve, SECP384R1): 
    312                return "ecdsa", "ecdsa-sha2-nistp384", _pem() 
    313 
    314            if isinstance(public_key.curve, SECP521R1): 
    315                return "ecdsa", "ecdsa-sha2-nistp521", _pem() 
    316 
    317            raise ValueError(f"unsupported curve '{public_key.curve.name}'") 
    318 
    319        if isinstance(public_key, Ed25519PublicKey): 
    320            return "ed25519", "ed25519", _raw() 
    321 
    322        raise ValueError(f"unsupported key '{type(public_key)}'") 
    323 
    324    @classmethod 
    325    def from_crypto( 
    326        cls, 
    327        public_key: PublicKeyTypes, 
    328        keyid: str | None = None, 
    329        scheme: str | None = None, 
    330    ) -> SSlibKey: 
    331        """Create SSlibKey from pyca/cryptography public key. 
    332 
    333        Args: 
    334            public_key: pyca/cryptography public key object. 
    335            keyid: Key identifier. If not passed, a default keyid is computed. 
    336            scheme: SSlibKey signing scheme. Defaults are "rsassa-pss-sha256", 
    337                "ecdsa-sha2-nistp256", "ecdsa-sha2-nistp384" and "ed25519" 
    338                according to the keytype. 
    339 
    340        Raises: 
    341            UnsupportedLibraryError: pyca/cryptography not installed 
    342            ValueError: Key type not supported 
    343 
    344        Returns: 
    345            SSlibKey 
    346 
    347        """ 
    348        if CRYPTO_IMPORT_ERROR: 
    349            raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) 
    350 
    351        keytype, default_scheme, public_key_value = cls._from_crypto(public_key) 
    352 
    353        if not scheme: 
    354            scheme = default_scheme 
    355 
    356        keyval = {"public": public_key_value} 
    357 
    358        if not keyid: 
    359            keyid = compute_default_keyid(keytype, scheme, keyval) 
    360 
    361        return SSlibKey(keyid, keytype, scheme, keyval) 
    362 
    363    @staticmethod 
    364    def _get_rsa_padding(name: str, hash_algorithm: HashAlgorithm) -> AsymmetricPadding: 
    365        """Helper to return rsa signature padding for name.""" 
    366        padding: AsymmetricPadding 
    367        if name == "pss": 
    368            padding = PSS(mgf=MGF1(hash_algorithm), salt_length=PSS.AUTO) 
    369 
    370        if name == "pkcs1v15": 
    371            padding = PKCS1v15() 
    372 
    373        return padding 
    374 
    375    def _verify_ed25519_fallback(self, signature: bytes, data: bytes) -> None: 
    376        """Helper to verify ed25519 sig if pyca/cryptography is unavailable.""" 
    377        try: 
    378            public_bytes = bytes.fromhex(self.keyval["public"]) 
    379            checkvalid(signature, data, public_bytes) 
    380 
    381        except SignatureMismatch as e: 
    382            raise UnverifiedSignatureError from e 
    383 
    384    def _verify(self, signature: bytes, data: bytes) -> None: 
    385        """Helper to verify signature using pyca/cryptography (default).""" 
    386 
    387        def _validate_type(key: object, type_: type) -> None: 
    388            if not isinstance(key, type_): 
    389                raise ValueError(f"bad key {key} for {self.scheme}") 
    390 
    391        def _validate_curve( 
    392            key: EllipticCurvePublicKey, curve: type[EllipticCurve] 
    393        ) -> None: 
    394            if not isinstance(key.curve, curve): 
    395                raise ValueError(f"bad curve {key.curve} for {self.scheme}") 
    396 
    397        try: 
    398            key: PublicKeyTypes 
    399            if self.keytype == "rsa" and self.scheme in [ 
    400                "rsassa-pss-sha224", 
    401                "rsassa-pss-sha256", 
    402                "rsassa-pss-sha384", 
    403                "rsassa-pss-sha512", 
    404                "rsa-pkcs1v15-sha224", 
    405                "rsa-pkcs1v15-sha256", 
    406                "rsa-pkcs1v15-sha384", 
    407                "rsa-pkcs1v15-sha512", 
    408            ]: 
    409                key = cast(RSAPublicKey, self._crypto_key()) 
    410                _validate_type(key, RSAPublicKey) 
    411                hash_name = self.get_hash_algorithm_name() 
    412                hash_algorithm = get_hash_algorithm(hash_name) 
    413                padding_name = self.get_padding_name() 
    414                padding = self._get_rsa_padding(padding_name, hash_algorithm) 
    415                key.verify(signature, data, padding, hash_algorithm) 
    416 
    417            elif ( 
    418                self.keytype in ["ecdsa", "ecdsa-sha2-nistp256"] 
    419                and self.scheme == "ecdsa-sha2-nistp256" 
    420            ): 
    421                key = cast(EllipticCurvePublicKey, self._crypto_key()) 
    422                _validate_type(key, EllipticCurvePublicKey) 
    423                _validate_curve(key, SECP256R1) 
    424                key.verify(signature, data, ECDSA(SHA256())) 
    425 
    426            elif ( 
    427                self.keytype in ["ecdsa", "ecdsa-sha2-nistp384"] 
    428                and self.scheme == "ecdsa-sha2-nistp384" 
    429            ): 
    430                key = cast(EllipticCurvePublicKey, self._crypto_key()) 
    431                _validate_type(key, EllipticCurvePublicKey) 
    432                _validate_curve(key, SECP384R1) 
    433                key.verify(signature, data, ECDSA(SHA384())) 
    434 
    435            elif ( 
    436                self.keytype in ["ecdsa", "ecdsa-sha2-nistp521"] 
    437                and self.scheme == "ecdsa-sha2-nistp521" 
    438            ): 
    439                key = cast(EllipticCurvePublicKey, self._crypto_key()) 
    440                _validate_type(key, EllipticCurvePublicKey) 
    441                _validate_curve(key, SECP521R1) 
    442                key.verify(signature, data, ECDSA(SHA512())) 
    443 
    444            elif self.keytype == "ed25519" and self.scheme == "ed25519": 
    445                public_bytes = bytes.fromhex(self.keyval["public"]) 
    446                key = Ed25519PublicKey.from_public_bytes(public_bytes) 
    447                key.verify(signature, data) 
    448 
    449            else: 
    450                raise ValueError(f"Unsupported public key {self.keytype}/{self.scheme}") 
    451 
    452        except InvalidSignature as e: 
    453            raise UnverifiedSignatureError from e 
    454 
    455    def verify_signature(self, signature: Signature, data: bytes) -> None: 
    456        try: 
    457            if signature.keyid != self.keyid: 
    458                raise ValueError( 
    459                    f"keyid mismatch: 'key id: {self.keyid}" 
    460                    f" != signature keyid: {signature.keyid}'" 
    461                ) 
    462 
    463            signature_bytes = bytes.fromhex(signature.signature) 
    464 
    465            if CRYPTO_IMPORT_ERROR: 
    466                if self.scheme != "ed25519": 
    467                    raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) 
    468 
    469                return self._verify_ed25519_fallback(signature_bytes, data) 
    470 
    471            return self._verify(signature_bytes, data) 
    472 
    473        except UnverifiedSignatureError as e: 
    474            raise UnverifiedSignatureError( 
    475                f"Failed to verify signature by {self.keyid}" 
    476            ) from e 
    477 
    478        except Exception as e: 
    479            logger.info("Key %s failed to verify sig: %s", self.keyid, e) 
    480            raise VerificationError( 
    481                f"Unknown failure to verify signature by {self.keyid}" 
    482            ) from e