1"""Signer implementation for Google Cloud KMS""" 
    2 
    3from __future__ import annotations 
    4 
    5import hashlib 
    6import logging 
    7from urllib import parse 
    8 
    9from securesystemslib import exceptions 
    10from securesystemslib.signer._key import Key, SSlibKey 
    11from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 
    12from securesystemslib.signer._utils import compute_default_keyid 
    13 
    14logger = logging.getLogger(__name__) 
    15 
    16GCP_IMPORT_ERROR = None 
    17try: 
    18    from google.cloud import kms 
    19    from google.cloud.kms_v1.types import CryptoKeyVersion 
    20 
    21    KEYTYPES_AND_SCHEMES = { 
    22        CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P256_SHA256: ( 
    23            "ecdsa", 
    24            "ecdsa-sha2-nistp256", 
    25        ), 
    26        CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P384_SHA384: ( 
    27            "ecdsa", 
    28            "ecdsa-sha2-nistp384", 
    29        ), 
    30        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_2048_SHA256: ( 
    31            "rsa", 
    32            "rsassa-pss-sha256", 
    33        ), 
    34        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_3072_SHA256: ( 
    35            "rsa", 
    36            "rsassa-pss-sha256", 
    37        ), 
    38        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_4096_SHA256: ( 
    39            "rsa", 
    40            "rsassa-pss-sha256", 
    41        ), 
    42        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_4096_SHA512: ( 
    43            "rsa", 
    44            "rsassa-pss-sha512", 
    45        ), 
    46        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_2048_SHA256: ( 
    47            "rsa", 
    48            "rsa-pkcs1v15-sha256", 
    49        ), 
    50        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_3072_SHA256: ( 
    51            "rsa", 
    52            "rsa-pkcs1v15-sha256", 
    53        ), 
    54        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_4096_SHA256: ( 
    55            "rsa", 
    56            "rsa-pkcs1v15-sha256", 
    57        ), 
    58        CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_4096_SHA512: ( 
    59            "rsa", 
    60            "rsa-pkcs1v15-sha512", 
    61        ), 
    62    } 
    63except ImportError: 
    64    GCP_IMPORT_ERROR = ( 
    65        "google-cloud-kms library required to sign with Google Cloud keys." 
    66    ) 
    67 
    68 
    69class GCPSigner(Signer): 
    70    """Google Cloud KMS Signer 
    71 
    72    This Signer uses Google Cloud KMS to sign: the payload is hashed locally, 
    73    but the signature is created on the KMS. 
    74 
    75    The signer uses "ambient" credentials: typically environment var 
    76    GOOGLE_APPLICATION_CREDENTIALS that points to a file with valid 
    77    credentials. These will be found by google.cloud.kms, see 
    78    https://cloud.google.com/docs/authentication/getting-started. 
    79    Some practical authentication options include: 
    80    * GitHub Action: https://github.com/google-github-actions/auth 
    81    * gcloud CLI: https://cloud.google.com/sdk/gcloud 
    82 
    83    The specific permissions that GCPSigner needs are: 
    84    * roles/cloudkms.signer for sign() 
    85    * roles/cloudkms.publicKeyViewer for import() 
    86 
    87    Arguments: 
    88        gcp_keyid: Fully qualified GCP KMS key name, like 
    89            projects/python-tuf-kms/locations/global/keyRings/securesystemslib-tests/cryptoKeys/ecdsa-sha2-nistp256/cryptoKeyVersions/1 
    90        public_key: The related public key instance 
    91 
    92    Raises: 
    93        UnsupportedAlgorithmError: The payload hash algorithm is unsupported. 
    94        UnsupportedLibraryError: google.cloud.kms was not found 
    95        Various errors from google.cloud modules: e.g. 
    96            google.auth.exceptions.DefaultCredentialsError if ambient 
    97            credentials are not found 
    98    """ 
    99 
    100    SCHEME = "gcpkms" 
    101 
    102    def __init__(self, gcp_keyid: str, public_key: SSlibKey): 
    103        if GCP_IMPORT_ERROR: 
    104            raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR) 
    105 
    106        if (public_key.keytype, public_key.scheme) not in KEYTYPES_AND_SCHEMES.values(): 
    107            raise exceptions.UnsupportedAlgorithmError( 
    108                f"Unsupported key ({public_key.keytype}/{public_key.scheme}) " 
    109                f"in key {public_key.keyid}" 
    110            ) 
    111 
    112        self.hash_algorithm = public_key.get_hash_algorithm_name() 
    113        self.gcp_keyid = gcp_keyid 
    114        self._public_key = public_key 
    115        self.client = kms.KeyManagementServiceClient() 
    116 
    117    @property 
    118    def public_key(self) -> SSlibKey: 
    119        return self._public_key 
    120 
    121    @classmethod 
    122    def from_priv_key_uri( 
    123        cls, 
    124        priv_key_uri: str, 
    125        public_key: Key, 
    126        secrets_handler: SecretsHandler | None = None, 
    127    ) -> GCPSigner: 
    128        if not isinstance(public_key, SSlibKey): 
    129            raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 
    130 
    131        uri = parse.urlparse(priv_key_uri) 
    132 
    133        if uri.scheme != cls.SCHEME: 
    134            raise ValueError(f"GCPSigner does not support {priv_key_uri}") 
    135 
    136        return cls(uri.path, public_key) 
    137 
    138    @classmethod 
    139    def import_(cls, gcp_keyid: str) -> tuple[str, SSlibKey]: 
    140        """Load key and signer details from KMS 
    141 
    142        Returns the private key uri and the public key. This method should only 
    143        be called once per key: the uri and Key should be stored for later use. 
    144        """ 
    145        if GCP_IMPORT_ERROR: 
    146            raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR) 
    147 
    148        client = kms.KeyManagementServiceClient() 
    149        request = {"name": gcp_keyid} 
    150        kms_pubkey = client.get_public_key(request) 
    151        try: 
    152            keytype, scheme = KEYTYPES_AND_SCHEMES[kms_pubkey.algorithm] 
    153        except KeyError as e: 
    154            raise exceptions.UnsupportedAlgorithmError( 
    155                f"{kms_pubkey.algorithm} is not a supported signing algorithm" 
    156            ) from e 
    157 
    158        keyval = {"public": kms_pubkey.pem} 
    159        keyid = compute_default_keyid(keytype, scheme, keyval) 
    160        public_key = SSlibKey(keyid, keytype, scheme, keyval) 
    161 
    162        return f"{cls.SCHEME}:{gcp_keyid}", public_key 
    163 
    164    def sign(self, payload: bytes) -> Signature: 
    165        """Signs payload with Google Cloud KMS. 
    166 
    167        Arguments: 
    168            payload: bytes to be signed. 
    169 
    170        Raises: 
    171            Various errors from google.cloud modules. 
    172 
    173        Returns: 
    174            Signature. 
    175        """ 
    176        # NOTE: request and response can contain CRC32C of the digest/sig: 
    177        # Verifying could be useful but would require another dependency... 
    178 
    179        hasher = hashlib.new(self.hash_algorithm) 
    180        hasher.update(payload) 
    181        digest = {self.hash_algorithm: hasher.digest()} 
    182        request = {"name": self.gcp_keyid, "digest": digest} 
    183 
    184        logger.debug("signing request %s", request) 
    185        response = self.client.asymmetric_sign(request) 
    186        logger.debug("signing response %s", response) 
    187 
    188        return Signature(self.public_key.keyid, response.signature.hex())