Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_utils.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

111 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Shared utilities. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import hashlib 

23import sys 

24from typing import IO, NewType, Union 

25 

26from cryptography.hazmat.primitives import serialization 

27from cryptography.hazmat.primitives.asymmetric import ec, rsa 

28from cryptography.x509 import ( 

29 Certificate, 

30 ExtensionNotFound, 

31 Version, 

32 load_der_x509_certificate, 

33) 

34from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID 

35from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm 

36 

37from sigstore import hashes as sigstore_hashes 

38from sigstore.errors import VerificationError 

39 

40if sys.version_info < (3, 11): 

41 import importlib_resources as resources 

42else: 

43 from importlib import resources 

44 

45 

46PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey] 

47 

48PublicKeyTypes = Union[type[rsa.RSAPublicKey], type[ec.EllipticCurvePublicKey]] 

49 

50HexStr = NewType("HexStr", str) 

51""" 

52A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`). 

53""" 

54B64Str = NewType("B64Str", str) 

55""" 

56A newtype for `str` objects that contain base64 encoded strings. 

57""" 

58KeyID = NewType("KeyID", bytes) 

59""" 

60A newtype for `bytes` objects that contain a key id. 

61""" 

62 

63 

64def load_pem_public_key( 

65 key_pem: bytes, 

66 *, 

67 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 

68) -> PublicKey: 

69 """ 

70 A specialization of `cryptography`'s `serialization.load_pem_public_key` 

71 with a uniform exception type (`VerificationError`) and filtering on valid key types 

72 for Sigstore purposes. 

73 """ 

74 

75 try: 

76 key = serialization.load_pem_public_key(key_pem) 

77 except Exception as exc: 

78 raise VerificationError("could not load PEM-formatted public key") from exc 

79 

80 if not isinstance(key, types): 

81 raise VerificationError(f"invalid key format: not one of {types}") 

82 

83 return key # type: ignore[return-value] 

84 

85 

86def load_der_public_key( 

87 key_der: bytes, 

88 *, 

89 types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey), 

90) -> PublicKey: 

91 """ 

92 The `load_pem_public_key` specialization, but DER. 

93 """ 

94 

95 try: 

96 key = serialization.load_der_public_key(key_der) 

97 except Exception as exc: 

98 raise VerificationError("could not load DER-formatted public key") from exc 

99 

100 if not isinstance(key, types): 

101 raise VerificationError(f"invalid key format: not one of {types}") 

102 

103 return key # type: ignore[return-value] 

104 

105 

106def base64_encode_pem_cert(cert: Certificate) -> B64Str: 

107 """ 

108 Returns a string containing a base64-encoded PEM-encoded X.509 certificate. 

109 """ 

110 

111 return B64Str( 

112 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() 

113 ) 

114 

115 

116def cert_der_to_pem(der: bytes) -> str: 

117 """ 

118 Converts a DER-encoded X.509 certificate into its PEM encoding. 

119 

120 Returns a string containing a PEM-encoded X.509 certificate. 

121 """ 

122 

123 # NOTE: Technically we don't have to round-trip like this, since 

124 # the DER-to-PEM transformation is entirely mechanical. 

125 cert = load_der_x509_certificate(der) 

126 return cert.public_bytes(serialization.Encoding.PEM).decode() 

127 

128 

129def key_id(key: PublicKey) -> KeyID: 

130 """ 

131 Returns an RFC 6962-style "key ID" for the given public key. 

132 

133 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2> 

134 """ 

135 public_bytes = key.public_bytes( 

136 encoding=serialization.Encoding.DER, 

137 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

138 ) 

139 

140 return KeyID(hashlib.sha256(public_bytes).digest()) 

141 

142 

143def sha256_digest( 

144 input_: bytes | IO[bytes] | sigstore_hashes.Hashed, 

145) -> sigstore_hashes.Hashed: 

146 """ 

147 Compute the SHA256 digest of an input stream or buffer or, 

148 if given a `Hashed`, return it directly. 

149 """ 

150 if isinstance(input_, sigstore_hashes.Hashed): 

151 return input_ 

152 

153 # If the input is already buffered into memory, there's no point in 

154 # going back through an I/O abstraction. 

155 if isinstance(input_, bytes): 

156 return sigstore_hashes.Hashed( 

157 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256 

158 ) 

159 

160 return sigstore_hashes.Hashed( 

161 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256 

162 ) 

163 

164 

165def _sha256_streaming(io: IO[bytes]) -> bytes: 

166 """ 

167 Compute the SHA256 of a stream. 

168 

169 This function does its own internal buffering, so an unbuffered stream 

170 should be supplied for optimal performance. 

171 """ 

172 

173 # NOTE: This function performs a SHA256 digest over a stream. 

174 # The stream's size is not checked, meaning that the stream's source 

175 # is implicitly trusted: if an attacker is able to truncate the stream's 

176 # source prematurely, then they could conceivably produce a digest 

177 # for a partial stream. This in turn could conceivably result 

178 # in a valid signature for an unintended (truncated) input. 

179 # 

180 # This is currently outside of sigstore-python's threat model: we 

181 # assume that the stream is trusted. 

182 # 

183 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972 

184 

185 sha256 = hashlib.sha256() 

186 # Per coreutils' ioblksize.h: 128KB performs optimally across a range 

187 # of systems in terms of minimizing syscall overhead. 

188 view = memoryview(bytearray(128 * 1024)) 

189 

190 nbytes = io.readinto(view) # type: ignore[attr-defined] 

191 while nbytes: 

192 sha256.update(view[:nbytes]) 

193 nbytes = io.readinto(view) # type: ignore[attr-defined] 

194 

195 return sha256.digest() 

196 

197 

198def read_embedded(name: str, prefix: str) -> bytes: 

199 """ 

200 Read a resource embedded in this distribution of sigstore-python, 

201 returning its contents as bytes. 

202 """ 

203 b: bytes = resources.files("sigstore._store").joinpath(prefix, name).read_bytes() 

204 return b 

205 

206 

207def cert_is_ca(cert: Certificate) -> bool: 

208 """ 

209 Returns `True` if and only if the given `Certificate` 

210 is a CA certificate. 

211 

212 This function doesn't indicate the trustworthiness of the given 

213 `Certificate`, only whether it has the appropriate interior state. 

214 

215 This function is **not** naively invertible: users **must** use the 

216 dedicated `cert_is_leaf` utility function to determine whether a particular 

217 leaf upholds Sigstore's invariants. 

218 """ 

219 

220 # Only v3 certificates should appear in the context of Sigstore; 

221 # earlier versions of X.509 lack extensions and have ambiguous CA 

222 # behavior. 

223 if cert.version != Version.v3: 

224 raise VerificationError(f"invalid X.509 version: {cert.version}") 

225 

226 # Valid CA certificates must have the following set: 

227 # 

228 # * `BasicKeyUsage.keyCertSign` 

229 # * `BasicConstraints.ca` 

230 # 

231 # Any other combination of states is inconsistent and invalid, meaning 

232 # that we won't consider the certificate a valid non-CA leaf. 

233 

234 try: 

235 basic_constraints = cert.extensions.get_extension_for_oid( 

236 ExtensionOID.BASIC_CONSTRAINTS 

237 ) 

238 

239 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9. 

240 if not basic_constraints.critical: 

241 raise VerificationError( 

242 "invalid X.509 certificate: non-critical BasicConstraints in CA" 

243 ) 

244 

245 ca = basic_constraints.value.ca # type: ignore[attr-defined] 

246 except ExtensionNotFound: 

247 # No BasicConstrains means that this can't possibly be a CA. 

248 return False 

249 

250 key_cert_sign = False 

251 try: 

252 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 

253 key_cert_sign = key_usage.value.key_cert_sign # type: ignore[attr-defined] 

254 except ExtensionNotFound: 

255 raise VerificationError("invalid X.509 certificate: missing KeyUsage") 

256 

257 # If both states are set, this is a CA. 

258 if ca and key_cert_sign: 

259 return True 

260 

261 if not (ca or key_cert_sign): 

262 return False 

263 

264 # Anything else is an invalid state that should never occur. 

265 raise VerificationError( 

266 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}" 

267 f", BasicConstraints.ca={ca}" 

268 ) 

269 

270 

271def cert_is_root_ca(cert: Certificate) -> bool: 

272 """ 

273 Returns `True` if and only if the given `Certificate` indicates 

274 that it's a root CA. 

275 

276 This is **not** a verification function, and it does not establish 

277 the trustworthiness of the given certificate. 

278 """ 

279 

280 # NOTE(ww): This function is obnoxiously long to make the different 

281 # states explicit. 

282 

283 # Only v3 certificates should appear in the context of Sigstore; 

284 # earlier versions of X.509 lack extensions and have ambiguous CA 

285 # behavior. 

286 if cert.version != Version.v3: 

287 raise VerificationError(f"invalid X.509 version: {cert.version}") 

288 

289 # Non-CAs can't possibly be root CAs. 

290 if not cert_is_ca(cert): 

291 return False 

292 

293 # A certificate that is its own issuer and signer is considered a root CA. 

294 try: 

295 cert.verify_directly_issued_by(cert) 

296 return True 

297 except Exception: 

298 return False 

299 

300 

301def cert_is_leaf(cert: Certificate) -> bool: 

302 """ 

303 Returns `True` if and only if the given `Certificate` is a valid 

304 leaf certificate for Sigstore purposes. This means that: 

305 

306 * It is not a root or intermediate CA; 

307 * It has `KeyUsage.digitalSignature`; 

308 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`. 

309 

310 This is **not** a verification function, and it does not establish 

311 the trustworthiness of the given certificate. 

312 """ 

313 

314 # Only v3 certificates should appear in the context of Sigstore; 

315 # earlier versions of X.509 lack extensions and have ambiguous CA 

316 # behavior. 

317 if cert.version != Version.v3: 

318 raise VerificationError(f"invalid X.509 version: {cert.version}") 

319 

320 # CAs are not leaves. 

321 if cert_is_ca(cert): 

322 return False 

323 

324 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 

325 digital_signature = key_usage.value.digital_signature # type: ignore[attr-defined] 

326 

327 if not digital_signature: 

328 raise VerificationError( 

329 "invalid certificate for Sigstore purposes: missing digital signature usage" 

330 ) 

331 

332 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages` 

333 # extension that includes a codesigning entitlement. Sigstore should 

334 # never issue a leaf that doesn't have this extended usage. 

335 try: 

336 extended_key_usage = cert.extensions.get_extension_for_oid( 

337 ExtensionOID.EXTENDED_KEY_USAGE 

338 ) 

339 

340 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore[operator] 

341 except ExtensionNotFound: 

342 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")