1"""Signer implementation for HashiCorp Vault (Transit secrets engine)"""
2
3from __future__ import annotations
4
5from base64 import b64decode, b64encode
6from urllib import parse
7
8from securesystemslib.exceptions import UnsupportedLibraryError
9from securesystemslib.signer._key import Key, SSlibKey
10from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
11
12VAULT_IMPORT_ERROR = None
13try:
14 import hvac
15 from cryptography.hazmat.primitives.asymmetric.ed25519 import (
16 Ed25519PublicKey,
17 )
18
19except ImportError:
20 VAULT_IMPORT_ERROR = "Signing with HashiCorp Vault requires hvac and cryptography."
21
22
23class VaultSigner(Signer):
24 """Signer for HashiCorp Vault Transit secrets engine
25
26 The signer uses "ambient" credentials to connect to vault, most notably
27 the environment variables ``VAULT_ADDR`` and ``VAULT_TOKEN`` must be set:
28 https://developer.hashicorp.com/vault/docs/commands#environment-variables
29
30 Priv key uri format is: ``hv:<KEY NAME>/<KEY VERSION>``.
31
32 Arguments:
33 hv_key_name: Name of vault key used for signing.
34 public_key: Related public key instance.
35 hv_key_version: Version of vault key used for signing.
36
37 Raises:
38 UnsupportedLibraryError: hvac or cryptography are not installed.
39 """
40
41 SCHEME = "hv"
42
43 def __init__(self, hv_key_name: str, public_key: SSlibKey, hv_key_version: int):
44 if VAULT_IMPORT_ERROR:
45 raise UnsupportedLibraryError(VAULT_IMPORT_ERROR)
46
47 self.hv_key_name = hv_key_name
48 self._public_key = public_key
49 self.hv_key_version = hv_key_version
50
51 # Client caches ambient settings in __init__. This means settings are
52 # stable for subsequent calls to sign, also if the environment changes.
53 self._client = hvac.Client()
54
55 def sign(self, payload: bytes) -> Signature:
56 """Signs payload with HashiCorp Vault Transit secrets engine.
57
58 Arguments:
59 payload: bytes to be signed.
60
61 Raises:
62 Various errors from hvac.
63
64 Returns:
65 Signature.
66 """
67 resp = self._client.secrets.transit.sign_data(
68 self.hv_key_name,
69 hash_input=b64encode(payload).decode(),
70 key_version=self.hv_key_version,
71 )
72
73 sig_b64 = resp["data"]["signature"].split(":")[2]
74 sig = b64decode(sig_b64).hex()
75
76 return Signature(self.public_key.keyid, sig)
77
78 @property
79 def public_key(self) -> SSlibKey:
80 return self._public_key
81
82 @classmethod
83 def from_priv_key_uri(
84 cls,
85 priv_key_uri: str,
86 public_key: Key,
87 secrets_handler: SecretsHandler | None = None,
88 ) -> VaultSigner:
89 if not isinstance(public_key, SSlibKey):
90 raise ValueError(f"Expected SSlibKey for {priv_key_uri}")
91
92 uri = parse.urlparse(priv_key_uri)
93
94 if uri.scheme != cls.SCHEME:
95 raise ValueError(f"VaultSigner does not support {priv_key_uri}")
96
97 name, version = uri.path.split("/")
98
99 return cls(name, public_key, int(version))
100
101 @classmethod
102 def import_(cls, hv_key_name: str) -> tuple[str, SSlibKey]:
103 """Load key and signer details from HashiCorp Vault.
104
105 If multiple keys exist in the vault under the passed name, only the
106 newest key is returned. Supported key type is: ed25519
107
108 See class documentation for details about settings and uri format.
109
110 Arguments:
111 hv_key_name: Name of vault key to import.
112
113 Raises:
114 UnsupportedLibraryError: hvac or cryptography are not installed.
115 Various errors from hvac.
116
117 Returns:
118 Private key uri and public key.
119
120 """
121 if VAULT_IMPORT_ERROR:
122 raise UnsupportedLibraryError(VAULT_IMPORT_ERROR)
123
124 client = hvac.Client()
125 resp = client.secrets.transit.read_key(hv_key_name)
126
127 # Pick key with highest version number
128 version, key_info = sorted(resp["data"]["keys"].items())[-1]
129
130 crypto_key = Ed25519PublicKey.from_public_bytes(
131 b64decode(key_info["public_key"])
132 )
133
134 key = SSlibKey.from_crypto(crypto_key)
135 uri = f"{VaultSigner.SCHEME}:{hv_key_name}/{version}"
136
137 return uri, key