Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_utils.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

121 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Shared utilities. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import hashlib 

23import sys 

24from datetime import datetime, timezone 

25from typing import IO, NewType, Union 

26from urllib import parse 

27 

28from cryptography.hazmat.primitives import serialization 

29from cryptography.hazmat.primitives.asymmetric import ec, ed25519, rsa 

30from cryptography.x509 import ( 

31 Certificate, 

32 ExtensionNotFound, 

33 Version, 

34 load_der_x509_certificate, 

35) 

36from cryptography.x509.oid import ExtendedKeyUsageOID, ExtensionOID 

37from sigstore_models.common.v1 import HashAlgorithm, TimeRange 

38 

39from sigstore import hashes as sigstore_hashes 

40from sigstore.errors import VerificationError 

41 

42if sys.version_info < (3, 11): 

43 import importlib_resources as resources 

44else: 

45 from importlib import resources 

46 

47 

48PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey, ed25519.Ed25519PublicKey] 

49 

50PublicKeyTypes = Union[ 

51 type[rsa.RSAPublicKey], 

52 type[ec.EllipticCurvePublicKey], 

53 type[ed25519.Ed25519PublicKey], 

54] 

55 

56HexStr = NewType("HexStr", str) 

57""" 

58A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`). 

59""" 

60B64Str = NewType("B64Str", str) 

61""" 

62A newtype for `str` objects that contain base64 encoded strings. 

63""" 

64KeyID = NewType("KeyID", bytes) 

65""" 

66A newtype for `bytes` objects that contain a key id. 

67""" 

68 

69 

70def load_pem_public_key( 

71 key_pem: bytes, 

72 *, 

73 types: tuple[PublicKeyTypes, ...] = ( 

74 rsa.RSAPublicKey, 

75 ec.EllipticCurvePublicKey, 

76 ed25519.Ed25519PublicKey, 

77 ), 

78) -> PublicKey: 

79 """ 

80 A specialization of `cryptography`'s `serialization.load_pem_public_key` 

81 with a uniform exception type (`VerificationError`) and filtering on valid key types 

82 for Sigstore purposes. 

83 """ 

84 

85 try: 

86 key = serialization.load_pem_public_key(key_pem) 

87 except Exception as exc: 

88 raise VerificationError("could not load PEM-formatted public key") from exc 

89 

90 if not isinstance(key, types): 

91 raise VerificationError(f"invalid key format: not one of {types}") 

92 

93 return key # type: ignore[return-value] 

94 

95 

96def load_der_public_key( 

97 key_der: bytes, 

98 *, 

99 types: tuple[PublicKeyTypes, ...] = ( 

100 rsa.RSAPublicKey, 

101 ec.EllipticCurvePublicKey, 

102 ed25519.Ed25519PublicKey, 

103 ), 

104) -> PublicKey: 

105 """ 

106 The `load_pem_public_key` specialization, but DER. 

107 """ 

108 

109 try: 

110 key = serialization.load_der_public_key(key_der) 

111 except Exception as exc: 

112 raise VerificationError("could not load DER-formatted public key") from exc 

113 

114 if not isinstance(key, types): 

115 raise VerificationError(f"invalid key format: not one of {types}") 

116 

117 return key # type: ignore[return-value] 

118 

119 

120def base64_encode_pem_cert(cert: Certificate) -> B64Str: 

121 """ 

122 Returns a string containing a base64-encoded PEM-encoded X.509 certificate. 

123 """ 

124 

125 return B64Str( 

126 base64.b64encode(cert.public_bytes(serialization.Encoding.PEM)).decode() 

127 ) 

128 

129 

130def cert_der_to_pem(der: bytes) -> str: 

131 """ 

132 Converts a DER-encoded X.509 certificate into its PEM encoding. 

133 

134 Returns a string containing a PEM-encoded X.509 certificate. 

135 """ 

136 

137 # NOTE: Technically we don't have to round-trip like this, since 

138 # the DER-to-PEM transformation is entirely mechanical. 

139 cert = load_der_x509_certificate(der) 

140 return cert.public_bytes(serialization.Encoding.PEM).decode() 

141 

142 

143def key_id(key: PublicKey) -> KeyID: 

144 """ 

145 Returns an RFC 6962-style "key ID" for the given public key. 

146 

147 See: <https://www.rfc-editor.org/rfc/rfc6962#section-3.2> 

148 """ 

149 public_bytes = key.public_bytes( 

150 encoding=serialization.Encoding.DER, 

151 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

152 ) 

153 

154 return KeyID(hashlib.sha256(public_bytes).digest()) 

155 

156 

157def sha256_digest( 

158 input_: bytes | IO[bytes] | sigstore_hashes.Hashed, 

159) -> sigstore_hashes.Hashed: 

160 """ 

161 Compute the SHA256 digest of an input stream or buffer or, 

162 if given a `Hashed`, return it directly. 

163 """ 

164 if isinstance(input_, sigstore_hashes.Hashed): 

165 return input_ 

166 

167 # If the input is already buffered into memory, there's no point in 

168 # going back through an I/O abstraction. 

169 if isinstance(input_, bytes): 

170 return sigstore_hashes.Hashed( 

171 digest=hashlib.sha256(input_).digest(), algorithm=HashAlgorithm.SHA2_256 

172 ) 

173 

174 return sigstore_hashes.Hashed( 

175 digest=_sha256_streaming(input_), algorithm=HashAlgorithm.SHA2_256 

176 ) 

177 

178 

179def _sha256_streaming(io: IO[bytes]) -> bytes: 

180 """ 

181 Compute the SHA256 of a stream. 

182 

183 This function does its own internal buffering, so an unbuffered stream 

184 should be supplied for optimal performance. 

185 """ 

186 

187 # NOTE: This function performs a SHA256 digest over a stream. 

188 # The stream's size is not checked, meaning that the stream's source 

189 # is implicitly trusted: if an attacker is able to truncate the stream's 

190 # source prematurely, then they could conceivably produce a digest 

191 # for a partial stream. This in turn could conceivably result 

192 # in a valid signature for an unintended (truncated) input. 

193 # 

194 # This is currently outside of sigstore-python's threat model: we 

195 # assume that the stream is trusted. 

196 # 

197 # See: https://github.com/sigstore/sigstore-python/pull/329#discussion_r1041215972 

198 

199 sha256 = hashlib.sha256() 

200 # Per coreutils' ioblksize.h: 128KB performs optimally across a range 

201 # of systems in terms of minimizing syscall overhead. 

202 view = memoryview(bytearray(128 * 1024)) 

203 

204 nbytes = io.readinto(view) # type: ignore[attr-defined] 

205 while nbytes: 

206 sha256.update(view[:nbytes]) 

207 nbytes = io.readinto(view) # type: ignore[attr-defined] 

208 

209 return sha256.digest() 

210 

211 

212def read_embedded(name: str, url: str) -> bytes: 

213 """ 

214 Read a resource for a given TUF repository embedded in this distribution 

215 of sigstore-python, returning its contents as bytes. 

216 """ 

217 embed_dir = parse.quote(url, safe="") 

218 b: bytes = resources.files("sigstore._store").joinpath(embed_dir, name).read_bytes() 

219 return b 

220 

221 

222def cert_is_ca(cert: Certificate) -> bool: 

223 """ 

224 Returns `True` if and only if the given `Certificate` 

225 is a CA certificate. 

226 

227 This function doesn't indicate the trustworthiness of the given 

228 `Certificate`, only whether it has the appropriate interior state. 

229 

230 This function is **not** naively invertible: users **must** use the 

231 dedicated `cert_is_leaf` utility function to determine whether a particular 

232 leaf upholds Sigstore's invariants. 

233 """ 

234 

235 # Only v3 certificates should appear in the context of Sigstore; 

236 # earlier versions of X.509 lack extensions and have ambiguous CA 

237 # behavior. 

238 if cert.version != Version.v3: 

239 raise VerificationError(f"invalid X.509 version: {cert.version}") 

240 

241 # Valid CA certificates must have the following set: 

242 # 

243 # * `BasicKeyUsage.keyCertSign` 

244 # * `BasicConstraints.ca` 

245 # 

246 # Any other combination of states is inconsistent and invalid, meaning 

247 # that we won't consider the certificate a valid non-CA leaf. 

248 

249 try: 

250 basic_constraints = cert.extensions.get_extension_for_oid( 

251 ExtensionOID.BASIC_CONSTRAINTS 

252 ) 

253 

254 # BasicConstraints must be marked as critical, per RFC 5280 4.2.1.9. 

255 if not basic_constraints.critical: 

256 raise VerificationError( 

257 "invalid X.509 certificate: non-critical BasicConstraints in CA" 

258 ) 

259 

260 ca = basic_constraints.value.ca # type: ignore[attr-defined] 

261 except ExtensionNotFound: 

262 # No BasicConstrains means that this can't possibly be a CA. 

263 return False 

264 

265 key_cert_sign = False 

266 try: 

267 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 

268 key_cert_sign = key_usage.value.key_cert_sign # type: ignore[attr-defined] 

269 except ExtensionNotFound: 

270 raise VerificationError("invalid X.509 certificate: missing KeyUsage") 

271 

272 # If both states are set, this is a CA. 

273 if ca and key_cert_sign: 

274 return True 

275 

276 if not (ca or key_cert_sign): 

277 return False 

278 

279 # Anything else is an invalid state that should never occur. 

280 raise VerificationError( 

281 f"invalid X.509 certificate states: KeyUsage.keyCertSign={key_cert_sign}" 

282 f", BasicConstraints.ca={ca}" 

283 ) 

284 

285 

286def cert_is_root_ca(cert: Certificate) -> bool: 

287 """ 

288 Returns `True` if and only if the given `Certificate` indicates 

289 that it's a root CA. 

290 

291 This is **not** a verification function, and it does not establish 

292 the trustworthiness of the given certificate. 

293 """ 

294 

295 # NOTE(ww): This function is obnoxiously long to make the different 

296 # states explicit. 

297 

298 # Only v3 certificates should appear in the context of Sigstore; 

299 # earlier versions of X.509 lack extensions and have ambiguous CA 

300 # behavior. 

301 if cert.version != Version.v3: 

302 raise VerificationError(f"invalid X.509 version: {cert.version}") 

303 

304 # Non-CAs can't possibly be root CAs. 

305 if not cert_is_ca(cert): 

306 return False 

307 

308 # A certificate that is its own issuer and signer is considered a root CA. 

309 try: 

310 cert.verify_directly_issued_by(cert) 

311 return True 

312 except Exception: 

313 return False 

314 

315 

316def cert_is_leaf(cert: Certificate) -> bool: 

317 """ 

318 Returns `True` if and only if the given `Certificate` is a valid 

319 leaf certificate for Sigstore purposes. This means that: 

320 

321 * It is not a root or intermediate CA; 

322 * It has `KeyUsage.digitalSignature`; 

323 * It has `CODE_SIGNING` as an `ExtendedKeyUsage`. 

324 

325 This is **not** a verification function, and it does not establish 

326 the trustworthiness of the given certificate. 

327 """ 

328 

329 # Only v3 certificates should appear in the context of Sigstore; 

330 # earlier versions of X.509 lack extensions and have ambiguous CA 

331 # behavior. 

332 if cert.version != Version.v3: 

333 raise VerificationError(f"invalid X.509 version: {cert.version}") 

334 

335 # CAs are not leaves. 

336 if cert_is_ca(cert): 

337 return False 

338 

339 key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) 

340 digital_signature = key_usage.value.digital_signature # type: ignore[attr-defined] 

341 

342 if not digital_signature: 

343 raise VerificationError( 

344 "invalid certificate for Sigstore purposes: missing digital signature usage" 

345 ) 

346 

347 # Finally, we check to make sure the leaf has an `ExtendedKeyUsages` 

348 # extension that includes a codesigning entitlement. Sigstore should 

349 # never issue a leaf that doesn't have this extended usage. 

350 try: 

351 extended_key_usage = cert.extensions.get_extension_for_oid( 

352 ExtensionOID.EXTENDED_KEY_USAGE 

353 ) 

354 

355 return ExtendedKeyUsageOID.CODE_SIGNING in extended_key_usage.value # type: ignore[operator] 

356 except ExtensionNotFound: 

357 raise VerificationError("invalid X.509 certificate: missing ExtendedKeyUsage") 

358 

359 

360def is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: 

361 """ 

362 Given a `period`, checks that the the current time is not before `start`. If 

363 `allow_expired` is `False`, also checks that the current time is not after 

364 `end`. 

365 """ 

366 now = datetime.now(timezone.utc) 

367 

368 # If there was no validity period specified, the key is always valid. 

369 if not period: 

370 return True 

371 

372 # Active: if the current time is before the starting period, we are not yet 

373 # valid. 

374 if now < period.start: 

375 return False 

376 

377 # If we want Expired keys, the key is valid at this point. Otherwise, check 

378 # that we are within range. 

379 return allow_expired or (period.end is None or now <= period.end)