1"""Signer implementation for HashiCorp Vault (Transit secrets engine)""" 
    2 
    3from __future__ import annotations 
    4 
    5from base64 import b64decode, b64encode 
    6from urllib import parse 
    7 
    8from securesystemslib.exceptions import UnsupportedLibraryError 
    9from securesystemslib.signer._key import Key, SSlibKey 
    10from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 
    11 
    12VAULT_IMPORT_ERROR = None 
    13try: 
    14    import hvac 
    15    from cryptography.hazmat.primitives.asymmetric.ed25519 import ( 
    16        Ed25519PublicKey, 
    17    ) 
    18 
    19except ImportError: 
    20    VAULT_IMPORT_ERROR = "Signing with HashiCorp Vault requires hvac and cryptography." 
    21 
    22 
    23class VaultSigner(Signer): 
    24    """Signer for HashiCorp Vault Transit secrets engine 
    25 
    26    The signer uses "ambient" credentials to connect to vault, most notably 
    27    the environment variables ``VAULT_ADDR`` and ``VAULT_TOKEN`` must be set: 
    28    https://developer.hashicorp.com/vault/docs/commands#environment-variables 
    29 
    30    Priv key uri format is: ``hv:<KEY NAME>/<KEY VERSION>``. 
    31 
    32    Arguments: 
    33        hv_key_name: Name of vault key used for signing. 
    34        public_key: Related public key instance. 
    35        hv_key_version: Version of vault key used for signing. 
    36 
    37    Raises: 
    38        UnsupportedLibraryError: hvac or cryptography are not installed. 
    39    """ 
    40 
    41    SCHEME = "hv" 
    42 
    43    def __init__(self, hv_key_name: str, public_key: SSlibKey, hv_key_version: int): 
    44        if VAULT_IMPORT_ERROR: 
    45            raise UnsupportedLibraryError(VAULT_IMPORT_ERROR) 
    46 
    47        self.hv_key_name = hv_key_name 
    48        self._public_key = public_key 
    49        self.hv_key_version = hv_key_version 
    50 
    51        # Client caches ambient settings in __init__. This means settings are 
    52        # stable for subsequent calls to sign, also if the environment changes. 
    53        self._client = hvac.Client() 
    54 
    55    def sign(self, payload: bytes) -> Signature: 
    56        """Signs payload with HashiCorp Vault Transit secrets engine. 
    57 
    58        Arguments: 
    59            payload: bytes to be signed. 
    60 
    61        Raises: 
    62            Various errors from hvac. 
    63 
    64        Returns: 
    65            Signature. 
    66        """ 
    67        resp = self._client.secrets.transit.sign_data( 
    68            self.hv_key_name, 
    69            hash_input=b64encode(payload).decode(), 
    70            key_version=self.hv_key_version, 
    71        ) 
    72 
    73        sig_b64 = resp["data"]["signature"].split(":")[2] 
    74        sig = b64decode(sig_b64).hex() 
    75 
    76        return Signature(self.public_key.keyid, sig) 
    77 
    78    @property 
    79    def public_key(self) -> SSlibKey: 
    80        return self._public_key 
    81 
    82    @classmethod 
    83    def from_priv_key_uri( 
    84        cls, 
    85        priv_key_uri: str, 
    86        public_key: Key, 
    87        secrets_handler: SecretsHandler | None = None, 
    88    ) -> VaultSigner: 
    89        if not isinstance(public_key, SSlibKey): 
    90            raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 
    91 
    92        uri = parse.urlparse(priv_key_uri) 
    93 
    94        if uri.scheme != cls.SCHEME: 
    95            raise ValueError(f"VaultSigner does not support {priv_key_uri}") 
    96 
    97        name, version = uri.path.split("/") 
    98 
    99        return cls(name, public_key, int(version)) 
    100 
    101    @classmethod 
    102    def import_(cls, hv_key_name: str) -> tuple[str, SSlibKey]: 
    103        """Load key and signer details from HashiCorp Vault. 
    104 
    105        If multiple keys exist in the vault under the passed name, only the 
    106        newest key is returned. Supported key type is: ed25519 
    107 
    108        See class documentation for details about settings and uri format. 
    109 
    110        Arguments: 
    111            hv_key_name: Name of vault key to import. 
    112 
    113        Raises: 
    114            UnsupportedLibraryError: hvac or cryptography are not installed. 
    115            Various errors from hvac. 
    116 
    117        Returns: 
    118            Private key uri and public key. 
    119 
    120        """ 
    121        if VAULT_IMPORT_ERROR: 
    122            raise UnsupportedLibraryError(VAULT_IMPORT_ERROR) 
    123 
    124        client = hvac.Client() 
    125        resp = client.secrets.transit.read_key(hv_key_name) 
    126 
    127        # Pick key with highest version number 
    128        version, key_info = sorted(resp["data"]["keys"].items())[-1] 
    129 
    130        crypto_key = Ed25519PublicKey.from_public_bytes( 
    131            b64decode(key_info["public_key"]) 
    132        ) 
    133 
    134        key = SSlibKey.from_crypto(crypto_key) 
    135        uri = f"{VaultSigner.SCHEME}:{hv_key_name}/{version}" 
    136 
    137        return uri, key