Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_utils.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Shared utilities. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import hashlib 

23import sys 

24from typing import IO, NewType, Union 

25from urllib import parse 

26 

27from cryptography.hazmat.primitives import serialization 

28from cryptography.hazmat.primitives.asymmetric import ec, ed25519, rsa 

29from cryptography.x509 import ( 

30 Certificate, 

31 ExtensionNotFound, 

32 Version, 

33 load_der_x509_certificate, 

34) 

35from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID 

36from sigstore_protobuf_specs.dev.sigstore.common.v1 import HashAlgorithm 

37 

38from sigstore import hashes as sigstore_hashes 

39from sigstore.errors import VerificationError 

40 

41if sys.version_info < (3, 11): 

42 import importlib_resources as resources 

43else: 

44 from importlib import resources 

45 

46 

47PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey, ed25519.Ed25519PublicKey] 

48 

49PublicKeyTypes = Union[ 

50 type[rsa.RSAPublicKey], 

51 type[ec.EllipticCurvePublicKey], 

52 type[ed25519.Ed25519PublicKey], 

53] 

54 

55HexStr = NewType("HexStr", str) 

56""" 

57A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`). 

58""" 

59B64Str = NewType("B64Str", str) 

60""" 

61A newtype for `str` objects that contain base64 encoded strings. 

62""" 

63KeyID = NewType("KeyID", bytes) 

64""" 

65A newtype for `bytes` objects that contain a key id. 

66""" 

67 

68 

69def load_pem_public_key( 

70 key_pem: bytes, 

71 *, 

72 types: tuple[PublicKeyTypes, ...] = ( 

73 rsa.RSAPublicKey, 

74 ec.EllipticCurvePublicKey, 

75 ed25519.Ed25519PublicKey, 

76 ), 

77) -> PublicKey: 

78 """ 

79 A specialization of `cryptography`'s `serialization.load_pem_public_key` 

80 with a uniform exception type (`VerificationError`) and filtering on valid key types 

81 for Sigstore purposes. 

82 """ 

83 

84 try: 

85 key = serialization.load_pem_public_key(key_pem) 

86 except Exception as exc: 

87 raise VerificationError("could not load PEM-formatted public key") from exc 

88 

89 if not isinstance(key, types): 

90 raise VerificationError(f"invalid key format: not one of {types}") 

91 

92 return key # type: ignore[return-value] 

93 

94 

95def load_der_public_key( 

96 key_der: bytes, 

97 *, 

98 types: tuple[PublicKeyTypes, ...] = ( 

99 rsa.RSAPublicKey, 

100 ec.EllipticCurvePublicKey, 

101 ed25519.Ed25519PublicKey, 

102 ), 

103) -> PublicKey: 

104 """ 

105 The `load_pem_public_key` specialization, but DER. 

106 """ 

107 

108 try: 

109 key = serialization.load_der_public_key(key_der) 

110 except Exception as exc: 

111 raise VerificationError("could not load DER-formatted public key") from exc 

112 

113 if not isinstance(key, types): 

114 raise VerificationError(f"invalid key format: not one of {types}") 

115 

116 return key # type: ignore[return-value] 

117 

118 

119def base64_encode_pem_cert(cert: Certificate) -> B64Str: 

120 """ 

121 Returns a string containing a base64-encoded PEM-encoded X.509 certificate. 

122 """ 

123 

124 return B64Str( 

125 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() 

126 ) 

127 

128 

129def cert_der_to_pem(der: bytes) -> str: 

130 """ 

131 Converts a DER-encoded X.509 certificate into its PEM encoding. 

132 

133 Returns a string containing a PEM-encoded X.509 certificate. 

134 """ 

135 

136 # NOTE: Technically we don't have to round-trip like this, since 

137 # the DER-to-PEM transformation is entirely mechanical. 

138 cert = load_der_x509_certificate(der) 

139 return cert.public_bytes(serialization.Encoding.PEM).decode() 

140 

141 

142def key_id(key: PublicKey) -> KeyID: 

143 """ 

144 Returns an RFC 6962-style "key ID" for the given public key. 

145 

146 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2> 

147 """ 

148 public_bytes = key.public_bytes( 

149 encoding=serialization.Encoding.DER, 

150 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

151 ) 

152 

153 return KeyID(hashlib.sha256(public_bytes).digest()) 

154 

155 

156def sha256_digest( 

157 input_: bytes | IO[bytes] | sigstore_hashes.Hashed, 

158) -> sigstore_hashes.Hashed: 

159 """ 

160 Compute the SHA256 digest of an input stream or buffer or, 

161 if given a `Hashed`, return it directly. 

162 """ 

163 if isinstance(input_, sigstore_hashes.Hashed): 

164 return input_ 

165 

166 # If the input is already buffered into memory, there's no point in 

167 # going back through an I/O abstraction. 

168 if isinstance(input_, bytes): 

169 return sigstore_hashes.Hashed( 

170 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256 

171 ) 

172 

173 return sigstore_hashes.Hashed( 

174 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256 

175 ) 

176 

177 

178def _sha256_streaming(io: IO[bytes]) -> bytes: 

179 """ 

180 Compute the SHA256 of a stream. 

181 

182 This function does its own internal buffering, so an unbuffered stream 

183 should be supplied for optimal performance. 

184 """ 

185 

186 # NOTE: This function performs a SHA256 digest over a stream. 

187 # The stream's size is not checked, meaning that the stream's source 

188 # is implicitly trusted: if an attacker is able to truncate the stream's 

189 # source prematurely, then they could conceivably produce a digest 

190 # for a partial stream. This in turn could conceivably result 

191 # in a valid signature for an unintended (truncated) input. 

192 # 

193 # This is currently outside of sigstore-python's threat model: we 

194 # assume that the stream is trusted. 

195 # 

196 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972 

197 

198 sha256 = hashlib.sha256() 

199 # Per coreutils' ioblksize.h: 128KB performs optimally across a range 

200 # of systems in terms of minimizing syscall overhead. 

201 view = memoryview(bytearray(128 * 1024)) 

202 

203 nbytes = io.readinto(view) # type: ignore[attr-defined] 

204 while nbytes: 

205 sha256.update(view[:nbytes]) 

206 nbytes = io.readinto(view) # type: ignore[attr-defined] 

207 

208 return sha256.digest() 

209 

210 

211def read_embedded(name: str, url: str) -> bytes: 

212 """ 

213 Read a resource for a given TUF repository embedded in this distribution 

214 of sigstore-python, returning its contents as bytes. 

215 """ 

216 embed_dir = parse.quote(url, safe="") 

217 b: bytes = resources.files("sigstore._store").joinpath(embed_dir, name).read_bytes() 

218 return b 

219 

220 

221def cert_is_ca(cert: Certificate) -> bool: 

222 """ 

223 Returns `True` if and only if the given `Certificate` 

224 is a CA certificate. 

225 

226 This function doesn't indicate the trustworthiness of the given 

227 `Certificate`, only whether it has the appropriate interior state. 

228 

229 This function is **not** naively invertible: users **must** use the 

230 dedicated `cert_is_leaf` utility function to determine whether a particular 

231 leaf upholds Sigstore's invariants. 

232 """ 

233 

234 # Only v3 certificates should appear in the context of Sigstore; 

235 # earlier versions of X.509 lack extensions and have ambiguous CA 

236 # behavior. 

237 if cert.version != Version.v3: 

238 raise VerificationError(f"invalid X.509 version: {cert.version}") 

239 

240 # Valid CA certificates must have the following set: 

241 # 

242 # * `BasicKeyUsage.keyCertSign` 

243 # * `BasicConstraints.ca` 

244 # 

245 # Any other combination of states is inconsistent and invalid, meaning 

246 # that we won't consider the certificate a valid non-CA leaf. 

247 

248 try: 

249 basic_constraints = cert.extensions.get_extension_for_oid( 

250 ExtensionOID.BASIC_CONSTRAINTS 

251 ) 

252 

253 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9. 

254 if not basic_constraints.critical: 

255 raise VerificationError( 

256 "invalid X.509 certificate: non-critical BasicConstraints in CA" 

257 ) 

258 

259 ca = basic_constraints.value.ca # type: ignore[attr-defined] 

260 except ExtensionNotFound: 

261 # No BasicConstrains means that this can't possibly be a CA. 

262 return False 

263 

264 key_cert_sign = False 

265 try: 

266 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 

267 key_cert_sign = key_usage.value.key_cert_sign # type: ignore[attr-defined] 

268 except ExtensionNotFound: 

269 raise VerificationError("invalid X.509 certificate: missing KeyUsage") 

270 

271 # If both states are set, this is a CA. 

272 if ca and key_cert_sign: 

273 return True 

274 

275 if not (ca or key_cert_sign): 

276 return False 

277 

278 # Anything else is an invalid state that should never occur. 

279 raise VerificationError( 

280 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}" 

281 f", BasicConstraints.ca={ca}" 

282 ) 

283 

284 

285def cert_is_root_ca(cert: Certificate) -> bool: 

286 """ 

287 Returns `True` if and only if the given `Certificate` indicates 

288 that it's a root CA. 

289 

290 This is **not** a verification function, and it does not establish 

291 the trustworthiness of the given certificate. 

292 """ 

293 

294 # NOTE(ww): This function is obnoxiously long to make the different 

295 # states explicit. 

296 

297 # Only v3 certificates should appear in the context of Sigstore; 

298 # earlier versions of X.509 lack extensions and have ambiguous CA 

299 # behavior. 

300 if cert.version != Version.v3: 

301 raise VerificationError(f"invalid X.509 version: {cert.version}") 

302 

303 # Non-CAs can't possibly be root CAs. 

304 if not cert_is_ca(cert): 

305 return False 

306 

307 # A certificate that is its own issuer and signer is considered a root CA. 

308 try: 

309 cert.verify_directly_issued_by(cert) 

310 return True 

311 except Exception: 

312 return False 

313 

314 

315def cert_is_leaf(cert: Certificate) -> bool: 

316 """ 

317 Returns `True` if and only if the given `Certificate` is a valid 

318 leaf certificate for Sigstore purposes. This means that: 

319 

320 * It is not a root or intermediate CA; 

321 * It has `KeyUsage.digitalSignature`; 

322 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`. 

323 

324 This is **not** a verification function, and it does not establish 

325 the trustworthiness of the given certificate. 

326 """ 

327 

328 # Only v3 certificates should appear in the context of Sigstore; 

329 # earlier versions of X.509 lack extensions and have ambiguous CA 

330 # behavior. 

331 if cert.version != Version.v3: 

332 raise VerificationError(f"invalid X.509 version: {cert.version}") 

333 

334 # CAs are not leaves. 

335 if cert_is_ca(cert): 

336 return False 

337 

338 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 

339 digital_signature = key_usage.value.digital_signature # type: ignore[attr-defined] 

340 

341 if not digital_signature: 

342 raise VerificationError( 

343 "invalid certificate for Sigstore purposes: missing digital signature usage" 

344 ) 

345 

346 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages` 

347 # extension that includes a codesigning entitlement. Sigstore should 

348 # never issue a leaf that doesn't have this extended usage. 

349 try: 

350 extended_key_usage = cert.extensions.get_extension_for_oid( 

351 ExtensionOID.EXTENDED_KEY_USAGE 

352 ) 

353 

354 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore[operator] 

355 except ExtensionNotFound: 

356 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage")