Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/securesystemslib/signer/_vault_signer.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1"""Signer implementation for HashiCorp Vault (Transit secrets engine)""" 

2 

3from __future__ import annotations 

4 

5from base64 import b64decode, b64encode 

6from urllib import parse 

7 

8from securesystemslib.exceptions import UnsupportedLibraryError 

9from securesystemslib.signer._key import Key, SSlibKey 

10from securesystemslib.signer._signer import SecretsHandler, Signature, Signer 

11 

12VAULT_IMPORT_ERROR = None 

13try: 

14 import hvac 

15 from cryptography.hazmat.primitives.asymmetric.ed25519 import ( 

16 Ed25519PublicKey, 

17 ) 

18 

19except ImportError: 

20 VAULT_IMPORT_ERROR = "Signing with HashiCorp Vault requires hvac and cryptography." 

21 

22 

23class VaultSigner(Signer): 

24 """Signer for HashiCorp Vault Transit secrets engine 

25 

26 The signer uses "ambient" credentials to connect to vault, most notably 

27 the environment variables ``VAULT_ADDR`` and ``VAULT_TOKEN`` must be set: 

28 https://developer.hashicorp.com/vault/docs/commands#environment-variables 

29 

30 Priv key uri format is: ``hv:<KEY NAME>/<KEY VERSION>``. 

31 

32 Arguments: 

33 hv_key_name: Name of vault key used for signing. 

34 public_key: Related public key instance. 

35 hv_key_version: Version of vault key used for signing. 

36 

37 Raises: 

38 UnsupportedLibraryError: hvac or cryptography are not installed. 

39 """ 

40 

41 SCHEME = "hv" 

42 

43 def __init__(self, hv_key_name: str, public_key: SSlibKey, hv_key_version: int): 

44 if VAULT_IMPORT_ERROR: 

45 raise UnsupportedLibraryError(VAULT_IMPORT_ERROR) 

46 

47 self.hv_key_name = hv_key_name 

48 self._public_key = public_key 

49 self.hv_key_version = hv_key_version 

50 

51 # Client caches ambient settings in __init__. This means settings are 

52 # stable for subsequent calls to sign, also if the environment changes. 

53 self._client = hvac.Client() 

54 

55 def sign(self, payload: bytes) -> Signature: 

56 """Signs payload with HashiCorp Vault Transit secrets engine. 

57 

58 Arguments: 

59 payload: bytes to be signed. 

60 

61 Raises: 

62 Various errors from hvac. 

63 

64 Returns: 

65 Signature. 

66 """ 

67 resp = self._client.secrets.transit.sign_data( 

68 self.hv_key_name, 

69 hash_input=b64encode(payload).decode(), 

70 key_version=self.hv_key_version, 

71 ) 

72 

73 sig_b64 = resp["data"]["signature"].split(":")[2] 

74 sig = b64decode(sig_b64).hex() 

75 

76 return Signature(self.public_key.keyid, sig) 

77 

78 @property 

79 def public_key(self) -> SSlibKey: 

80 return self._public_key 

81 

82 @classmethod 

83 def from_priv_key_uri( 

84 cls, 

85 priv_key_uri: str, 

86 public_key: Key, 

87 secrets_handler: SecretsHandler | None = None, 

88 ) -> VaultSigner: 

89 if not isinstance(public_key, SSlibKey): 

90 raise ValueError(f"Expected SSlibKey for {priv_key_uri}") 

91 

92 uri = parse.urlparse(priv_key_uri) 

93 

94 if uri.scheme != cls.SCHEME: 

95 raise ValueError(f"VaultSigner does not support {priv_key_uri}") 

96 

97 name, version = uri.path.split("/") 

98 

99 return cls(name, public_key, int(version)) 

100 

101 @classmethod 

102 def import_(cls, hv_key_name: str) -> tuple[str, SSlibKey]: 

103 """Load key and signer details from HashiCorp Vault. 

104 

105 If multiple keys exist in the vault under the passed name, only the 

106 newest key is returned. Supported key type is: ed25519 

107 

108 See class documentation for details about settings and uri format. 

109 

110 Arguments: 

111 hv_key_name: Name of vault key to import. 

112 

113 Raises: 

114 UnsupportedLibraryError: hvac or cryptography are not installed. 

115 Various errors from hvac. 

116 

117 Returns: 

118 Private key uri and public key. 

119 

120 """ 

121 if VAULT_IMPORT_ERROR: 

122 raise UnsupportedLibraryError(VAULT_IMPORT_ERROR) 

123 

124 client = hvac.Client() 

125 resp = client.secrets.transit.read_key(hv_key_name) 

126 

127 # Pick key with highest version number 

128 version, key_info = sorted(resp["data"]["keys"].items())[-1] 

129 

130 crypto_key = Ed25519PublicKey.from_public_bytes( 

131 b64decode(key_info["public_key"]) 

132 ) 

133 

134 key = SSlibKey.from_crypto(crypto_key) 

135 uri = f"{VaultSigner.SCHEME}:{hv_key_name}/{version}" 

136 

137 return uri, key