Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/ngclient/_internal/trusted_metadata_set.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

153 statements  

1# Copyright the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4"""Trusted collection of client-side TUF Metadata. 

5 

6``TrustedMetadataSet`` keeps track of the current valid set of metadata for the 

7client, and handles almost every step of the "Detailed client workflow" ( 

8https://theupdateframework.github.io/specification/latest#detailed-client-workflow) 

9in the TUF specification: the remaining steps are related to filesystem and 

10network IO, which are not handled here. 

11 

12Loaded metadata can be accessed via index access with rolename as key 

13(``trusted_set[Root.type]``) or, in the case of top-level metadata, using the 

14helper properties (``trusted_set.root``). 

15 

16Signatures are verified and discarded upon inclusion into the trusted set. 

17 

18The rules that ``TrustedMetadataSet`` follows for top-level metadata are 

19 * Metadata must be loaded in order: 

20 root -> timestamp -> snapshot -> targets -> (delegated targets). 

21 * Metadata can be loaded even if it is expired (or in the snapshot case if the 

22 meta info does not match): this is called "intermediate metadata". 

23 * Intermediate metadata can _only_ be used to load newer versions of the 

24 same metadata: As an example an expired root can be used to load a new root. 

25 * Metadata is loadable only if metadata before it in loading order is loaded 

26 (and is not intermediate): As an example timestamp can be loaded if a 

27 final (non-expired) root has been loaded. 

28 * Metadata is not loadable if any metadata after it in loading order has been 

29 loaded: As an example new roots cannot be loaded if timestamp is loaded. 

30 

31Exceptions are raised if metadata fails to load in any way. 

32 

33Example of loading root, timestamp and snapshot: 

34 

35>>> # Load local root (RepositoryErrors here stop the update) 

36>>> with open(root_path, "rb") as f: 

37>>> trusted_set = TrustedMetadataSet(f.read(), EnvelopeType.METADATA) 

38>>> 

39>>> # update root from remote until no more are available 

40>>> with download(Root.type, trusted_set.root.version + 1) as f: 

41>>> trusted_set.update_root(f.read()) 

42>>> 

43>>> # load local timestamp, then update from remote 

44>>> try: 

45>>> with open(timestamp_path, "rb") as f: 

46>>> trusted_set.update_timestamp(f.read()) 

47>>> except (RepositoryError, OSError): 

48>>> pass # failure to load a local file is ok 

49>>> 

50>>> with download(Timestamp.type) as f: 

51>>> trusted_set.update_timestamp(f.read()) 

52>>> 

53>>> # load local snapshot, then update from remote if needed 

54>>> try: 

55>>> with open(snapshot_path, "rb") as f: 

56>>> trusted_set.update_snapshot(f.read()) 

57>>> except (RepositoryError, OSError): 

58>>> # local snapshot is not valid, load from remote 

59>>> # (RepositoryErrors here stop the update) 

60>>> with download(Snapshot.type, version) as f: 

61>>> trusted_set.update_snapshot(f.read()) 

62""" 

63 

64from __future__ import annotations 

65 

66import datetime 

67import logging 

68from collections import abc 

69from typing import TYPE_CHECKING, Union, cast 

70 

71from tuf.api import exceptions 

72from tuf.api.dsse import SimpleEnvelope 

73from tuf.api.metadata import ( 

74 Metadata, 

75 Root, 

76 Signed, 

77 Snapshot, 

78 T, 

79 Targets, 

80 Timestamp, 

81) 

82from tuf.ngclient.config import EnvelopeType 

83 

84if TYPE_CHECKING: 

85 from collections.abc import Iterator 

86 

87 from securesystemslib.signer import Signature 

88 

89logger = logging.getLogger(__name__) 

90 

91Delegator = Union[Root, Targets] 

92 

93 

94class TrustedMetadataSet(abc.Mapping): 

95 """Internal class to keep track of trusted metadata in ``Updater``. 

96 

97 ``TrustedMetadataSet`` ensures that the collection of metadata in it is 

98 valid and trusted through the whole client update workflow. It provides 

99 easy ways to update the metadata with the caller making decisions on 

100 what is updated. 

101 """ 

102 

103 def __init__(self, root_data: bytes, envelope_type: EnvelopeType): 

104 """Initialize ``TrustedMetadataSet`` by loading trusted root metadata. 

105 

106 Args: 

107 root_data: Trusted root metadata as bytes. Note that this metadata 

108 will only be verified by itself: it is the source of trust for 

109 all metadata in the ``TrustedMetadataSet`` 

110 envelope_type: Configures deserialization and verification mode of 

111 TUF metadata. 

112 

113 Raises: 

114 RepositoryError: Metadata failed to load or verify. The actual 

115 error type and content will contain more details. 

116 """ 

117 self._trusted_set: dict[str, Signed] = {} 

118 self.reference_time = datetime.datetime.now(datetime.timezone.utc) 

119 

120 if envelope_type is EnvelopeType.SIMPLE: 

121 self._load_data = _load_from_simple_envelope 

122 else: 

123 self._load_data = _load_from_metadata 

124 

125 # Load and validate the local root metadata. Valid initial trusted root 

126 # metadata is required 

127 logger.debug("Updating initial trusted root") 

128 self._load_trusted_root(root_data) 

129 

130 def __getitem__(self, role: str) -> Signed: 

131 """Return current ``Signed`` for ``role``.""" 

132 return self._trusted_set[role] 

133 

134 def __len__(self) -> int: 

135 """Return number of ``Signed`` objects in ``TrustedMetadataSet``.""" 

136 return len(self._trusted_set) 

137 

138 def __iter__(self) -> Iterator[Signed]: 

139 """Return iterator over ``Signed`` objects in 

140 ``TrustedMetadataSet``. 

141 """ 

142 return iter(self._trusted_set.values()) 

143 

144 # Helper properties for top level metadata 

145 @property 

146 def root(self) -> Root: 

147 """Get current root.""" 

148 return cast(Root, self._trusted_set[Root.type]) 

149 

150 @property 

151 def timestamp(self) -> Timestamp: 

152 """Get current timestamp.""" 

153 return cast(Timestamp, self._trusted_set[Timestamp.type]) 

154 

155 @property 

156 def snapshot(self) -> Snapshot: 

157 """Get current snapshot.""" 

158 return cast(Snapshot, self._trusted_set[Snapshot.type]) 

159 

160 @property 

161 def targets(self) -> Targets: 

162 """Get current top-level targets.""" 

163 return cast(Targets, self._trusted_set[Targets.type]) 

164 

165 # Methods for updating metadata 

166 def update_root(self, data: bytes) -> Root: 

167 """Verify and load ``data`` as new root metadata. 

168 

169 Note that an expired intermediate root is considered valid: expiry is 

170 only checked for the final root in ``update_timestamp()``. 

171 

172 Args: 

173 data: Unverified new root metadata as bytes 

174 

175 Raises: 

176 RuntimeError: This function is called after updating timestamp. 

177 RepositoryError: Metadata failed to load or verify. The actual 

178 error type and content will contain more details. 

179 

180 Returns: 

181 Deserialized and verified ``Root`` object 

182 """ 

183 if Timestamp.type in self._trusted_set: 

184 raise RuntimeError("Cannot update root after timestamp") 

185 logger.debug("Updating root") 

186 

187 new_root, new_root_bytes, new_root_signatures = self._load_data( 

188 Root, data, self.root 

189 ) 

190 if new_root.version != self.root.version + 1: 

191 raise exceptions.BadVersionNumberError( 

192 f"Expected root version {self.root.version + 1}" 

193 f" instead got version {new_root.version}" 

194 ) 

195 

196 # Verify that new root is signed by itself 

197 new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures) 

198 

199 self._trusted_set[Root.type] = new_root 

200 logger.debug("Updated root v%d", new_root.version) 

201 

202 return new_root 

203 

204 def update_timestamp(self, data: bytes) -> Timestamp: 

205 """Verify and load ``data`` as new timestamp metadata. 

206 

207 Note that an intermediate timestamp is allowed to be expired: 

208 ``TrustedMetadataSet`` will throw an ``ExpiredMetadataError`` in 

209 this case but the intermediate timestamp will be loaded. This way 

210 a newer timestamp can still be loaded (and the intermediate 

211 timestamp will be used for rollback protection). Expired timestamp 

212 will prevent loading snapshot metadata. 

213 

214 Args: 

215 data: Unverified new timestamp metadata as bytes 

216 

217 Raises: 

218 RuntimeError: This function is called after updating snapshot. 

219 RepositoryError: Metadata failed to load or verify as final 

220 timestamp. The actual error type and content will contain 

221 more details. 

222 

223 Returns: 

224 Deserialized and verified ``Timestamp`` object 

225 """ 

226 if Snapshot.type in self._trusted_set: 

227 raise RuntimeError("Cannot update timestamp after snapshot") 

228 

229 # client workflow 5.3.10: Make sure final root is not expired. 

230 if self.root.is_expired(self.reference_time): 

231 raise exceptions.ExpiredMetadataError("Final root.json is expired") 

232 # No need to check for 5.3.11 (fast forward attack recovery): 

233 # timestamp/snapshot can not yet be loaded at this point 

234 

235 new_timestamp, _, _ = self._load_data(Timestamp, data, self.root) 

236 

237 # If an existing trusted timestamp is updated, 

238 # check for a rollback attack 

239 if Timestamp.type in self._trusted_set: 

240 # Prevent rolling back timestamp version 

241 if new_timestamp.version < self.timestamp.version: 

242 raise exceptions.BadVersionNumberError( 

243 f"New timestamp version {new_timestamp.version} must" 

244 f" be >= {self.timestamp.version}" 

245 ) 

246 # Keep using old timestamp if versions are equal. 

247 if new_timestamp.version == self.timestamp.version: 

248 raise exceptions.EqualVersionNumberError 

249 

250 # Prevent rolling back snapshot version 

251 snapshot_meta = self.timestamp.snapshot_meta 

252 new_snapshot_meta = new_timestamp.snapshot_meta 

253 if new_snapshot_meta.version < snapshot_meta.version: 

254 raise exceptions.BadVersionNumberError( 

255 f"New snapshot version must be >= {snapshot_meta.version}" 

256 f", got version {new_snapshot_meta.version}" 

257 ) 

258 

259 # expiry not checked to allow old timestamp to be used for rollback 

260 # protection of new timestamp: expiry is checked in update_snapshot() 

261 

262 self._trusted_set[Timestamp.type] = new_timestamp 

263 logger.debug("Updated timestamp v%d", new_timestamp.version) 

264 

265 # timestamp is loaded: raise if it is not valid _final_ timestamp 

266 self._check_final_timestamp() 

267 

268 return new_timestamp 

269 

270 def _check_final_timestamp(self) -> None: 

271 """Raise if timestamp is expired.""" 

272 

273 if self.timestamp.is_expired(self.reference_time): 

274 raise exceptions.ExpiredMetadataError("timestamp.json is expired") 

275 

276 def update_snapshot( 

277 self, data: bytes, trusted: bool | None = False 

278 ) -> Snapshot: 

279 """Verify and load ``data`` as new snapshot metadata. 

280 

281 Note that an intermediate snapshot is allowed to be expired and version 

282 is allowed to not match timestamp meta version: ``TrustedMetadataSet`` 

283 will throw an ``ExpiredMetadataError``/``BadVersionNumberError`` in 

284 these cases but the intermediate snapshot will be loaded. This way a 

285 newer snapshot can still be loaded (and the intermediate snapshot will 

286 be used for rollback protection). Expired snapshot or snapshot that 

287 does not match timestamp meta version will prevent loading targets. 

288 

289 Args: 

290 data: Unverified new snapshot metadata as bytes 

291 trusted: ``True`` if data has at some point been verified by 

292 ``TrustedMetadataSet`` as a valid snapshot. Purpose of trusted 

293 is to allow loading of locally stored snapshot as intermediate 

294 snapshot even if hashes in current timestamp meta no longer 

295 match data. Default is False. 

296 

297 Raises: 

298 RuntimeError: This function is called before updating timestamp 

299 or after updating targets. 

300 RepositoryError: Data failed to load or verify as final snapshot. 

301 The actual error type and content will contain more details. 

302 

303 Returns: 

304 Deserialized and verified ``Snapshot`` object 

305 """ 

306 

307 if Timestamp.type not in self._trusted_set: 

308 raise RuntimeError("Cannot update snapshot before timestamp") 

309 if Targets.type in self._trusted_set: 

310 raise RuntimeError("Cannot update snapshot after targets") 

311 logger.debug("Updating snapshot") 

312 

313 # Snapshot cannot be loaded if final timestamp is expired 

314 self._check_final_timestamp() 

315 

316 snapshot_meta = self.timestamp.snapshot_meta 

317 

318 # Verify non-trusted data against the hashes in timestamp, if any. 

319 # Trusted snapshot data has already been verified once. 

320 if not trusted: 

321 snapshot_meta.verify_length_and_hashes(data) 

322 

323 new_snapshot, _, _ = self._load_data(Snapshot, data, self.root) 

324 

325 # version not checked against meta version to allow old snapshot to be 

326 # used in rollback protection: it is checked when targets is updated 

327 

328 # If an existing trusted snapshot is updated, check for rollback attack 

329 if Snapshot.type in self._trusted_set: 

330 for filename, fileinfo in self.snapshot.meta.items(): 

331 new_fileinfo = new_snapshot.meta.get(filename) 

332 

333 # Prevent removal of any metadata in meta 

334 if new_fileinfo is None: 

335 raise exceptions.RepositoryError( 

336 f"New snapshot is missing info for '{filename}'" 

337 ) 

338 

339 # Prevent rollback of any metadata versions 

340 if new_fileinfo.version < fileinfo.version: 

341 raise exceptions.BadVersionNumberError( 

342 f"Expected {filename} version " 

343 f"{new_fileinfo.version}, got {fileinfo.version}." 

344 ) 

345 

346 # expiry not checked to allow old snapshot to be used for rollback 

347 # protection of new snapshot: it is checked when targets is updated 

348 

349 self._trusted_set[Snapshot.type] = new_snapshot 

350 logger.debug("Updated snapshot v%d", new_snapshot.version) 

351 

352 # snapshot is loaded, but we raise if it's not valid _final_ snapshot 

353 self._check_final_snapshot() 

354 

355 return new_snapshot 

356 

357 def _check_final_snapshot(self) -> None: 

358 """Raise if snapshot is expired or meta version does not match.""" 

359 

360 if self.snapshot.is_expired(self.reference_time): 

361 raise exceptions.ExpiredMetadataError("snapshot.json is expired") 

362 snapshot_meta = self.timestamp.snapshot_meta 

363 if self.snapshot.version != snapshot_meta.version: 

364 raise exceptions.BadVersionNumberError( 

365 f"Expected snapshot version {snapshot_meta.version}, " 

366 f"got {self.snapshot.version}" 

367 ) 

368 

369 def update_targets(self, data: bytes) -> Targets: 

370 """Verify and load ``data`` as new top-level targets metadata. 

371 

372 Args: 

373 data: Unverified new targets metadata as bytes 

374 

375 Raises: 

376 RepositoryError: Metadata failed to load or verify. The actual 

377 error type and content will contain more details. 

378 

379 Returns: 

380 Deserialized and verified `Targets`` object 

381 """ 

382 return self.update_delegated_targets(data, Targets.type, Root.type) 

383 

384 def update_delegated_targets( 

385 self, data: bytes, role_name: str, delegator_name: str 

386 ) -> Targets: 

387 """Verify and load ``data`` as new metadata for target ``role_name``. 

388 

389 Args: 

390 data: Unverified new metadata as bytes 

391 role_name: Role name of the new metadata 

392 delegator_name: Name of the role delegating to the new metadata 

393 

394 Raises: 

395 RuntimeError: This function is called before updating snapshot. 

396 RepositoryError: Metadata failed to load or verify. The actual 

397 error type and content will contain more details. 

398 

399 Returns: 

400 Deserialized and verified ``Targets`` object 

401 """ 

402 if Snapshot.type not in self._trusted_set: 

403 raise RuntimeError("Cannot load targets before snapshot") 

404 

405 # Targets cannot be loaded if final snapshot is expired or its version 

406 # does not match meta version in timestamp 

407 self._check_final_snapshot() 

408 

409 delegator: Delegator | None = self.get(delegator_name) 

410 if delegator is None: 

411 raise RuntimeError("Cannot load targets before delegator") 

412 

413 logger.debug("Updating %s delegated by %s", role_name, delegator_name) 

414 

415 # Verify against the hashes in snapshot, if any 

416 meta = self.snapshot.meta.get(f"{role_name}.json") 

417 if meta is None: 

418 raise exceptions.RepositoryError( 

419 f"Snapshot does not contain information for '{role_name}'" 

420 ) 

421 

422 meta.verify_length_and_hashes(data) 

423 

424 new_delegate, _, _ = self._load_data( 

425 Targets, data, delegator, role_name 

426 ) 

427 

428 version = new_delegate.version 

429 if version != meta.version: 

430 raise exceptions.BadVersionNumberError( 

431 f"Expected {role_name} v{meta.version}, got v{version}." 

432 ) 

433 

434 if new_delegate.is_expired(self.reference_time): 

435 raise exceptions.ExpiredMetadataError(f"New {role_name} is expired") 

436 

437 self._trusted_set[role_name] = new_delegate 

438 logger.debug("Updated %s v%d", role_name, version) 

439 

440 return new_delegate 

441 

442 def _load_trusted_root(self, data: bytes) -> None: 

443 """Verify and load ``data`` as trusted root metadata. 

444 

445 Note that an expired initial root is considered valid: expiry is 

446 only checked for the final root in ``update_timestamp()``. 

447 """ 

448 new_root, new_root_bytes, new_root_signatures = self._load_data( 

449 Root, data 

450 ) 

451 new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures) 

452 

453 self._trusted_set[Root.type] = new_root 

454 logger.debug("Loaded trusted root v%d", new_root.version) 

455 

456 

457def _load_from_metadata( 

458 role: type[T], 

459 data: bytes, 

460 delegator: Delegator | None = None, 

461 role_name: str | None = None, 

462) -> tuple[T, bytes, dict[str, Signature]]: 

463 """Load traditional metadata bytes, and extract and verify payload. 

464 

465 If no delegator is passed, verification is skipped. Returns a tuple of 

466 deserialized payload, signed payload bytes, and signatures. 

467 """ 

468 md = Metadata[T].from_bytes(data) 

469 

470 if md.signed.type != role.type: 

471 raise exceptions.RepositoryError( 

472 f"Expected '{role.type}', got '{md.signed.type}'" 

473 ) 

474 

475 if delegator: 

476 if role_name is None: 

477 role_name = role.type 

478 

479 delegator.verify_delegate(role_name, md.signed_bytes, md.signatures) 

480 

481 return md.signed, md.signed_bytes, md.signatures 

482 

483 

484def _load_from_simple_envelope( 

485 role: type[T], 

486 data: bytes, 

487 delegator: Delegator | None = None, 

488 role_name: str | None = None, 

489) -> tuple[T, bytes, dict[str, Signature]]: 

490 """Load simple envelope bytes, and extract and verify payload. 

491 

492 If no delegator is passed, verification is skipped. Returns a tuple of 

493 deserialized payload, signed payload bytes, and signatures. 

494 """ 

495 

496 envelope = SimpleEnvelope[T].from_bytes(data) 

497 

498 if envelope.payload_type != SimpleEnvelope.DEFAULT_PAYLOAD_TYPE: 

499 raise exceptions.RepositoryError( 

500 f"Expected '{SimpleEnvelope.DEFAULT_PAYLOAD_TYPE}', " 

501 f"got '{envelope.payload_type}'" 

502 ) 

503 

504 if delegator: 

505 if role_name is None: 

506 role_name = role.type 

507 delegator.verify_delegate( 

508 role_name, envelope.pae(), envelope.signatures 

509 ) 

510 

511 signed = envelope.get_signed() 

512 if signed.type != role.type: 

513 raise exceptions.RepositoryError( 

514 f"Expected '{role.type}', got '{signed.type}'" 

515 ) 

516 

517 return signed, envelope.pae(), envelope.signatures