1"""Signer implementation for Azure Key Vault""" 
    2 
    3from __future__ import annotations 
    4 
    5import hashlib 
    6import logging 
    7from urllib import parse 
    8 
    9from securesystemslib.exceptions import UnsupportedLibraryError 
    10from securesystemslib.signer._key import Key, SSlibKey 
    11from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 
    12from securesystemslib.signer._utils import compute_default_keyid 
    13 
    14AZURE_IMPORT_ERROR = None 
    15try: 
    16    from azure.core.exceptions import HttpResponseError 
    17    from azure.identity import DefaultAzureCredential 
    18    from azure.keyvault.keys import KeyClient, KeyCurveName, KeyVaultKey 
    19    from azure.keyvault.keys.crypto import ( 
    20        CryptographyClient, 
    21        SignatureAlgorithm, 
    22    ) 
    23    from cryptography.hazmat.primitives.asymmetric import ec 
    24    from cryptography.hazmat.primitives.asymmetric.utils import ( 
    25        encode_dss_signature, 
    26    ) 
    27    from cryptography.hazmat.primitives.serialization import ( 
    28        Encoding, 
    29        PublicFormat, 
    30    ) 
    31 
    32    KEYTYPES_AND_SCHEMES = { 
    33        KeyCurveName.p_256: ("ecdsa", "ecdsa-sha2-nistp256"), 
    34        KeyCurveName.p_384: ("ecdsa", "ecdsa-sha2-nistp384"), 
    35        KeyCurveName.p_521: ("ecdsa", "ecdsa-sha2-nistp521"), 
    36    } 
    37 
    38    SIGNATURE_ALGORITHMS = { 
    39        "ecdsa-sha2-nistp256": SignatureAlgorithm.es256, 
    40        "ecdsa-sha2-nistp384": SignatureAlgorithm.es384, 
    41        "ecdsa-sha2-nistp521": SignatureAlgorithm.es512, 
    42    } 
    43 
    44 
    45except ImportError: 
    46    AZURE_IMPORT_ERROR = ( 
    47        "Signing with Azure Key Vault requires azure-identity, " 
    48        "azure-keyvault-keys and cryptography." 
    49    ) 
    50 
    51logger = logging.getLogger(__name__) 
    52 
    53 
    54class UnsupportedKeyType(Exception):  # noqa: N818 
    55    pass 
    56 
    57 
    58class AzureSigner(Signer): 
    59    """Azure Key Vault Signer 
    60 
    61    This Signer uses Azure Key Vault to sign. 
    62    Currently this signer only supports signing with EC keys. 
    63    RSA support will be added in a separate pull request. 
    64 
    65    The specific permissions that AzureSigner needs are: 
    66    * "Key Vault Crypto User" for import() and sign() 
    67 
    68    See https://learn.microsoft.com/en-us/azure/key-vault/general/rbac-guide?tabs=azure-cli 
    69    for a list of all built-in Azure Key Vault roles 
    70 
    71    Arguments: 
    72        az_key_uri: Fully qualified Azure Key Vault name, like 
    73            https://<vault-name>.vault.azure.net/keys/<key-name>/<version> 
    74        public_key: public key object 
    75 
    76    Raises: 
    77        Various errors from azure.identity 
    78        Various errors from azure.keyvault.keys 
    79    """ 
    80 
    81    SCHEME = "azurekms" 
    82 
    83    def __init__(self, az_key_uri: str, public_key: SSlibKey): 
    84        if AZURE_IMPORT_ERROR: 
    85            raise UnsupportedLibraryError(AZURE_IMPORT_ERROR) 
    86 
    87        if (public_key.keytype, public_key.scheme) not in KEYTYPES_AND_SCHEMES.values(): 
    88            logger.info("only EC keys are supported for now") 
    89            raise UnsupportedKeyType( 
    90                "Supplied key must be an EC key on curve " 
    91                "nistp256, nistp384, or nistp521" 
    92            ) 
    93 
    94        cred = DefaultAzureCredential() 
    95        self.crypto_client = CryptographyClient( 
    96            az_key_uri, 
    97            credential=cred, 
    98        ) 
    99        self.signature_algorithm = SIGNATURE_ALGORITHMS[public_key.scheme] 
    100        self.hash_algorithm = public_key.get_hash_algorithm_name() 
    101        self._public_key = public_key 
    102 
    103    @property 
    104    def public_key(self) -> SSlibKey: 
    105        return self._public_key 
    106 
    107    @staticmethod 
    108    def _get_key_vault_key( 
    109        cred: DefaultAzureCredential, 
    110        vault_name: str, 
    111        key_name: str, 
    112    ) -> KeyVaultKey: 
    113        """Return KeyVaultKey created from the Vault name and key name""" 
    114        vault_url = f"https://{vault_name}.vault.azure.net/" 
    115 
    116        try: 
    117            key_client = KeyClient(vault_url=vault_url, credential=cred) 
    118            return key_client.get_key(key_name) 
    119        except (HttpResponseError,) as e: 
    120            logger.info( 
    121                "Key %s/%s failed to create key client from credentials, " 
    122                "key ID, and Vault URL: %s", 
    123                vault_name, 
    124                key_name, 
    125                str(e), 
    126            ) 
    127            raise e 
    128 
    129    @staticmethod 
    130    def _create_crypto_client( 
    131        cred: DefaultAzureCredential, 
    132        kv_key: KeyVaultKey, 
    133    ) -> CryptographyClient: 
    134        """Return CryptographyClient created Azure credentials and a KeyVaultKey""" 
    135        try: 
    136            return CryptographyClient(kv_key, credential=cred) 
    137        except (HttpResponseError,) as e: 
    138            logger.info( 
    139                "Key %s failed to create crypto client from " 
    140                "credentials and KeyVaultKey: %s", 
    141                kv_key, 
    142                str(e), 
    143            ) 
    144            raise e 
    145 
    146    @staticmethod 
    147    def _get_keytype_and_scheme(crv: str) -> tuple[str, str]: 
    148        try: 
    149            return KEYTYPES_AND_SCHEMES[crv] 
    150        except KeyError: 
    151            raise UnsupportedKeyType("Unsupported curve supplied by key") 
    152 
    153    @classmethod 
    154    def from_priv_key_uri( 
    155        cls, 
    156        priv_key_uri: str, 
    157        public_key: Key, 
    158        secrets_handler: SecretsHandler | None = None, 
    159    ) -> AzureSigner: 
    160        if not isinstance(public_key, SSlibKey): 
    161            raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 
    162 
    163        uri = parse.urlparse(priv_key_uri) 
    164 
    165        if uri.scheme != cls.SCHEME: 
    166            raise ValueError(f"AzureSigner does not support {priv_key_uri}") 
    167 
    168        az_key_uri = priv_key_uri.replace("azurekms:", "https:") 
    169        return cls(az_key_uri, public_key) 
    170 
    171    @classmethod 
    172    def import_(cls, az_vault_name: str, az_key_name: str) -> tuple[str, SSlibKey]: 
    173        """Load key and signer details from KMS 
    174 
    175        Returns the private key uri and the public key. This method should only 
    176        be called once per key: the uri and Key should be stored for later use. 
    177        """ 
    178        if AZURE_IMPORT_ERROR: 
    179            raise UnsupportedLibraryError(AZURE_IMPORT_ERROR) 
    180 
    181        credential = DefaultAzureCredential() 
    182        key_vault_key = cls._get_key_vault_key(credential, az_vault_name, az_key_name) 
    183 
    184        if not key_vault_key.key.kty.startswith("EC"): 
    185            raise UnsupportedKeyType(f"Unsupported key type {key_vault_key.key.kty}") 
    186 
    187        if key_vault_key.key.crv == KeyCurveName.p_256: 
    188            crv: ec.EllipticCurve = ec.SECP256R1() 
    189        elif key_vault_key.key.crv == KeyCurveName.p_384: 
    190            crv = ec.SECP384R1() 
    191        elif key_vault_key.key.crv == KeyCurveName.p_521: 
    192            crv = ec.SECP521R1() 
    193        else: 
    194            raise UnsupportedKeyType(f"Unsupported curve type {key_vault_key.key.crv}") 
    195 
    196        # Key is in JWK format, create a curve from it with the parameters 
    197        x = int.from_bytes(key_vault_key.key.x, byteorder="big") 
    198        y = int.from_bytes(key_vault_key.key.y, byteorder="big") 
    199 
    200        cpub = ec.EllipticCurvePublicNumbers(x, y, crv) 
    201        pub_key = cpub.public_key() 
    202        pem = pub_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) 
    203 
    204        keytype, scheme = cls._get_keytype_and_scheme(key_vault_key.key.crv) 
    205        keyval = {"public": pem.decode("utf-8")} 
    206        keyid = compute_default_keyid(keytype, scheme, keyval) 
    207        public_key = SSlibKey(keyid, keytype, scheme, keyval) 
    208        priv_key_uri = key_vault_key.key.kid.replace("https:", "azurekms:") 
    209 
    210        return priv_key_uri, public_key 
    211 
    212    def sign(self, payload: bytes) -> Signature: 
    213        """Signs payload with Azure Key Vault. 
    214 
    215        Arguments: 
    216            payload: bytes to be signed. 
    217 
    218        Raises: 
    219            Various errors from azure.keyvault.keys. 
    220 
    221        Returns: 
    222            Signature. 
    223        """ 
    224 
    225        hasher = hashlib.new(self.hash_algorithm) 
    226        hasher.update(payload) 
    227        digest = hasher.digest() 
    228        response = self.crypto_client.sign(self.signature_algorithm, digest) 
    229 
    230        # This code is copied from: 
    231        # https://github.com/secure-systems-lab/securesystemslib/blob/135567fa04f10d0c6a4cd32eb45ce736e1f50a93/securesystemslib/signer/_hsm_signer.py#L379 
    232        # 
    233        # The PKCS11 signature octets correspond to the concatenation of the 
    234        # ECDSA values r and s, both represented as an octet string of equal 
    235        # length of at most nLen with the most significant byte first (i.e. 
    236        # big endian) 
    237        # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178 
    238        r_s_len = int(len(response.signature) / 2) 
    239        r = int.from_bytes(response.signature[:r_s_len], byteorder="big") 
    240        s = int.from_bytes(response.signature[r_s_len:], byteorder="big") 
    241 
    242        # Create an ASN.1 encoded Dss-Sig-Value to be used with 
    243        # pyca/cryptography 
    244        dss_sig_value = encode_dss_signature(r, s).hex() 
    245 
    246        return Signature(self.public_key.keyid, dss_sig_value)