1# Copyright the TUF contributors
2# SPDX-License-Identifier: MIT OR Apache-2.0
3
4"""Trusted collection of client-side TUF Metadata.
5
6``TrustedMetadataSet`` keeps track of the current valid set of metadata for the
7client, and handles almost every step of the "Detailed client workflow" (
8https://theupdateframework.github.io/specification/latest#detailed-client-workflow)
9in the TUF specification: the remaining steps are related to filesystem and
10network IO, which are not handled here.
11
12Loaded metadata can be accessed via index access with rolename as key
13(``trusted_set[Root.type]``) or, in the case of top-level metadata, using the
14helper properties (``trusted_set.root``).
15
16Signatures are verified and discarded upon inclusion into the trusted set.
17
18The rules that ``TrustedMetadataSet`` follows for top-level metadata are
19 * Metadata must be loaded in order:
20 root -> timestamp -> snapshot -> targets -> (delegated targets).
21 * Metadata can be loaded even if it is expired (or in the snapshot case if the
22 meta info does not match): this is called "intermediate metadata".
23 * Intermediate metadata can _only_ be used to load newer versions of the
24 same metadata: As an example an expired root can be used to load a new root.
25 * Metadata is loadable only if metadata before it in loading order is loaded
26 (and is not intermediate): As an example timestamp can be loaded if a
27 final (non-expired) root has been loaded.
28 * Metadata is not loadable if any metadata after it in loading order has been
29 loaded: As an example new roots cannot be loaded if timestamp is loaded.
30
31Exceptions are raised if metadata fails to load in any way.
32
33Example of loading root, timestamp and snapshot:
34
35>>> # Load local root (RepositoryErrors here stop the update)
36>>> with open(root_path, "rb") as f:
37>>> trusted_set = TrustedMetadataSet(f.read(), EnvelopeType.METADATA)
38>>>
39>>> # update root from remote until no more are available
40>>> with download(Root.type, trusted_set.root.version + 1) as f:
41>>> trusted_set.update_root(f.read())
42>>>
43>>> # load local timestamp, then update from remote
44>>> try:
45>>> with open(timestamp_path, "rb") as f:
46>>> trusted_set.update_timestamp(f.read())
47>>> except (RepositoryError, OSError):
48>>> pass # failure to load a local file is ok
49>>>
50>>> with download(Timestamp.type) as f:
51>>> trusted_set.update_timestamp(f.read())
52>>>
53>>> # load local snapshot, then update from remote if needed
54>>> try:
55>>> with open(snapshot_path, "rb") as f:
56>>> trusted_set.update_snapshot(f.read())
57>>> except (RepositoryError, OSError):
58>>> # local snapshot is not valid, load from remote
59>>> # (RepositoryErrors here stop the update)
60>>> with download(Snapshot.type, version) as f:
61>>> trusted_set.update_snapshot(f.read())
62"""
63
64from __future__ import annotations
65
66import datetime
67import logging
68from collections import abc
69from typing import TYPE_CHECKING, Union, cast
70
71from tuf.api import exceptions
72from tuf.api.dsse import SimpleEnvelope
73from tuf.api.metadata import (
74 Metadata,
75 Root,
76 Signed,
77 Snapshot,
78 T,
79 Targets,
80 Timestamp,
81)
82from tuf.ngclient.config import EnvelopeType
83
84if TYPE_CHECKING:
85 from collections.abc import Iterator
86
87 from securesystemslib.signer import Signature
88
89logger = logging.getLogger(__name__)
90
91Delegator = Union[Root, Targets]
92
93
94class TrustedMetadataSet(abc.Mapping):
95 """Internal class to keep track of trusted metadata in ``Updater``.
96
97 ``TrustedMetadataSet`` ensures that the collection of metadata in it is
98 valid and trusted through the whole client update workflow. It provides
99 easy ways to update the metadata with the caller making decisions on
100 what is updated.
101 """
102
103 def __init__(self, root_data: bytes, envelope_type: EnvelopeType):
104 """Initialize ``TrustedMetadataSet`` by loading trusted root metadata.
105
106 Args:
107 root_data: Trusted root metadata as bytes. Note that this metadata
108 will only be verified by itself: it is the source of trust for
109 all metadata in the ``TrustedMetadataSet``
110 envelope_type: Configures deserialization and verification mode of
111 TUF metadata.
112
113 Raises:
114 RepositoryError: Metadata failed to load or verify. The actual
115 error type and content will contain more details.
116 """
117 self._trusted_set: dict[str, Signed] = {}
118 self.reference_time = datetime.datetime.now(datetime.timezone.utc)
119
120 if envelope_type is EnvelopeType.SIMPLE:
121 self._load_data = _load_from_simple_envelope
122 else:
123 self._load_data = _load_from_metadata
124
125 # Load and validate the local root metadata. Valid initial trusted root
126 # metadata is required
127 logger.debug("Updating initial trusted root")
128 self._load_trusted_root(root_data)
129
130 def __getitem__(self, role: str) -> Signed:
131 """Return current ``Signed`` for ``role``."""
132 return self._trusted_set[role]
133
134 def __len__(self) -> int:
135 """Return number of ``Signed`` objects in ``TrustedMetadataSet``."""
136 return len(self._trusted_set)
137
138 def __iter__(self) -> Iterator[Signed]:
139 """Return iterator over ``Signed`` objects in
140 ``TrustedMetadataSet``.
141 """
142 return iter(self._trusted_set.values())
143
144 # Helper properties for top level metadata
145 @property
146 def root(self) -> Root:
147 """Get current root."""
148 return cast(Root, self._trusted_set[Root.type])
149
150 @property
151 def timestamp(self) -> Timestamp:
152 """Get current timestamp."""
153 return cast(Timestamp, self._trusted_set[Timestamp.type])
154
155 @property
156 def snapshot(self) -> Snapshot:
157 """Get current snapshot."""
158 return cast(Snapshot, self._trusted_set[Snapshot.type])
159
160 @property
161 def targets(self) -> Targets:
162 """Get current top-level targets."""
163 return cast(Targets, self._trusted_set[Targets.type])
164
165 # Methods for updating metadata
166 def update_root(self, data: bytes) -> Root:
167 """Verify and load ``data`` as new root metadata.
168
169 Note that an expired intermediate root is considered valid: expiry is
170 only checked for the final root in ``update_timestamp()``.
171
172 Args:
173 data: Unverified new root metadata as bytes
174
175 Raises:
176 RuntimeError: This function is called after updating timestamp.
177 RepositoryError: Metadata failed to load or verify. The actual
178 error type and content will contain more details.
179
180 Returns:
181 Deserialized and verified ``Root`` object
182 """
183 if Timestamp.type in self._trusted_set:
184 raise RuntimeError("Cannot update root after timestamp")
185 logger.debug("Updating root")
186
187 new_root, new_root_bytes, new_root_signatures = self._load_data(
188 Root, data, self.root
189 )
190 if new_root.version != self.root.version + 1:
191 raise exceptions.BadVersionNumberError(
192 f"Expected root version {self.root.version + 1}"
193 f" instead got version {new_root.version}"
194 )
195
196 # Verify that new root is signed by itself
197 new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures)
198
199 self._trusted_set[Root.type] = new_root
200 logger.debug("Updated root v%d", new_root.version)
201
202 return new_root
203
204 def update_timestamp(self, data: bytes) -> Timestamp:
205 """Verify and load ``data`` as new timestamp metadata.
206
207 Note that an intermediate timestamp is allowed to be expired:
208 ``TrustedMetadataSet`` will throw an ``ExpiredMetadataError`` in
209 this case but the intermediate timestamp will be loaded. This way
210 a newer timestamp can still be loaded (and the intermediate
211 timestamp will be used for rollback protection). Expired timestamp
212 will prevent loading snapshot metadata.
213
214 Args:
215 data: Unverified new timestamp metadata as bytes
216
217 Raises:
218 RuntimeError: This function is called after updating snapshot.
219 RepositoryError: Metadata failed to load or verify as final
220 timestamp. The actual error type and content will contain
221 more details.
222
223 Returns:
224 Deserialized and verified ``Timestamp`` object
225 """
226 if Snapshot.type in self._trusted_set:
227 raise RuntimeError("Cannot update timestamp after snapshot")
228
229 # client workflow 5.3.10: Make sure final root is not expired.
230 if self.root.is_expired(self.reference_time):
231 raise exceptions.ExpiredMetadataError("Final root.json is expired")
232 # No need to check for 5.3.11 (fast forward attack recovery):
233 # timestamp/snapshot can not yet be loaded at this point
234
235 new_timestamp, _, _ = self._load_data(Timestamp, data, self.root)
236
237 # If an existing trusted timestamp is updated,
238 # check for a rollback attack
239 if Timestamp.type in self._trusted_set:
240 # Prevent rolling back timestamp version
241 if new_timestamp.version < self.timestamp.version:
242 raise exceptions.BadVersionNumberError(
243 f"New timestamp version {new_timestamp.version} must"
244 f" be >= {self.timestamp.version}"
245 )
246 # Keep using old timestamp if versions are equal.
247 if new_timestamp.version == self.timestamp.version:
248 raise exceptions.EqualVersionNumberError
249
250 # Prevent rolling back snapshot version
251 snapshot_meta = self.timestamp.snapshot_meta
252 new_snapshot_meta = new_timestamp.snapshot_meta
253 if new_snapshot_meta.version < snapshot_meta.version:
254 raise exceptions.BadVersionNumberError(
255 f"New snapshot version must be >= {snapshot_meta.version}"
256 f", got version {new_snapshot_meta.version}"
257 )
258
259 # expiry not checked to allow old timestamp to be used for rollback
260 # protection of new timestamp: expiry is checked in update_snapshot()
261
262 self._trusted_set[Timestamp.type] = new_timestamp
263 logger.debug("Updated timestamp v%d", new_timestamp.version)
264
265 # timestamp is loaded: raise if it is not valid _final_ timestamp
266 self._check_final_timestamp()
267
268 return new_timestamp
269
270 def _check_final_timestamp(self) -> None:
271 """Raise if timestamp is expired."""
272
273 if self.timestamp.is_expired(self.reference_time):
274 raise exceptions.ExpiredMetadataError("timestamp.json is expired")
275
276 def update_snapshot(
277 self, data: bytes, trusted: bool | None = False
278 ) -> Snapshot:
279 """Verify and load ``data`` as new snapshot metadata.
280
281 Note that an intermediate snapshot is allowed to be expired and version
282 is allowed to not match timestamp meta version: ``TrustedMetadataSet``
283 will throw an ``ExpiredMetadataError``/``BadVersionNumberError`` in
284 these cases but the intermediate snapshot will be loaded. This way a
285 newer snapshot can still be loaded (and the intermediate snapshot will
286 be used for rollback protection). Expired snapshot or snapshot that
287 does not match timestamp meta version will prevent loading targets.
288
289 Args:
290 data: Unverified new snapshot metadata as bytes
291 trusted: ``True`` if data has at some point been verified by
292 ``TrustedMetadataSet`` as a valid snapshot. Purpose of trusted
293 is to allow loading of locally stored snapshot as intermediate
294 snapshot even if hashes in current timestamp meta no longer
295 match data. Default is False.
296
297 Raises:
298 RuntimeError: This function is called before updating timestamp
299 or after updating targets.
300 RepositoryError: Data failed to load or verify as final snapshot.
301 The actual error type and content will contain more details.
302
303 Returns:
304 Deserialized and verified ``Snapshot`` object
305 """
306
307 if Timestamp.type not in self._trusted_set:
308 raise RuntimeError("Cannot update snapshot before timestamp")
309 if Targets.type in self._trusted_set:
310 raise RuntimeError("Cannot update snapshot after targets")
311 logger.debug("Updating snapshot")
312
313 # Snapshot cannot be loaded if final timestamp is expired
314 self._check_final_timestamp()
315
316 snapshot_meta = self.timestamp.snapshot_meta
317
318 # Verify non-trusted data against the hashes in timestamp, if any.
319 # Trusted snapshot data has already been verified once.
320 if not trusted:
321 snapshot_meta.verify_length_and_hashes(data)
322
323 new_snapshot, _, _ = self._load_data(Snapshot, data, self.root)
324
325 # version not checked against meta version to allow old snapshot to be
326 # used in rollback protection: it is checked when targets is updated
327
328 # If an existing trusted snapshot is updated, check for rollback attack
329 if Snapshot.type in self._trusted_set:
330 for filename, fileinfo in self.snapshot.meta.items():
331 new_fileinfo = new_snapshot.meta.get(filename)
332
333 # Prevent removal of any metadata in meta
334 if new_fileinfo is None:
335 raise exceptions.RepositoryError(
336 f"New snapshot is missing info for '{filename}'"
337 )
338
339 # Prevent rollback of any metadata versions
340 if new_fileinfo.version < fileinfo.version:
341 raise exceptions.BadVersionNumberError(
342 f"Expected {filename} version "
343 f"{new_fileinfo.version}, got {fileinfo.version}."
344 )
345
346 # expiry not checked to allow old snapshot to be used for rollback
347 # protection of new snapshot: it is checked when targets is updated
348
349 self._trusted_set[Snapshot.type] = new_snapshot
350 logger.debug("Updated snapshot v%d", new_snapshot.version)
351
352 # snapshot is loaded, but we raise if it's not valid _final_ snapshot
353 self._check_final_snapshot()
354
355 return new_snapshot
356
357 def _check_final_snapshot(self) -> None:
358 """Raise if snapshot is expired or meta version does not match."""
359
360 if self.snapshot.is_expired(self.reference_time):
361 raise exceptions.ExpiredMetadataError("snapshot.json is expired")
362 snapshot_meta = self.timestamp.snapshot_meta
363 if self.snapshot.version != snapshot_meta.version:
364 raise exceptions.BadVersionNumberError(
365 f"Expected snapshot version {snapshot_meta.version}, "
366 f"got {self.snapshot.version}"
367 )
368
369 def update_targets(self, data: bytes) -> Targets:
370 """Verify and load ``data`` as new top-level targets metadata.
371
372 Args:
373 data: Unverified new targets metadata as bytes
374
375 Raises:
376 RepositoryError: Metadata failed to load or verify. The actual
377 error type and content will contain more details.
378
379 Returns:
380 Deserialized and verified `Targets`` object
381 """
382 return self.update_delegated_targets(data, Targets.type, Root.type)
383
384 def update_delegated_targets(
385 self, data: bytes, role_name: str, delegator_name: str
386 ) -> Targets:
387 """Verify and load ``data`` as new metadata for target ``role_name``.
388
389 Args:
390 data: Unverified new metadata as bytes
391 role_name: Role name of the new metadata
392 delegator_name: Name of the role delegating to the new metadata
393
394 Raises:
395 RuntimeError: This function is called before updating snapshot.
396 RepositoryError: Metadata failed to load or verify. The actual
397 error type and content will contain more details.
398
399 Returns:
400 Deserialized and verified ``Targets`` object
401 """
402 if Snapshot.type not in self._trusted_set:
403 raise RuntimeError("Cannot load targets before snapshot")
404
405 # Targets cannot be loaded if final snapshot is expired or its version
406 # does not match meta version in timestamp
407 self._check_final_snapshot()
408
409 delegator: Delegator | None = self.get(delegator_name)
410 if delegator is None:
411 raise RuntimeError("Cannot load targets before delegator")
412
413 logger.debug("Updating %s delegated by %s", role_name, delegator_name)
414
415 # Verify against the hashes in snapshot, if any
416 meta = self.snapshot.meta.get(f"{role_name}.json")
417 if meta is None:
418 raise exceptions.RepositoryError(
419 f"Snapshot does not contain information for '{role_name}'"
420 )
421
422 meta.verify_length_and_hashes(data)
423
424 new_delegate, _, _ = self._load_data(
425 Targets, data, delegator, role_name
426 )
427
428 version = new_delegate.version
429 if version != meta.version:
430 raise exceptions.BadVersionNumberError(
431 f"Expected {role_name} v{meta.version}, got v{version}."
432 )
433
434 if new_delegate.is_expired(self.reference_time):
435 raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")
436
437 self._trusted_set[role_name] = new_delegate
438 logger.debug("Updated %s v%d", role_name, version)
439
440 return new_delegate
441
442 def _load_trusted_root(self, data: bytes) -> None:
443 """Verify and load ``data`` as trusted root metadata.
444
445 Note that an expired initial root is considered valid: expiry is
446 only checked for the final root in ``update_timestamp()``.
447 """
448 new_root, new_root_bytes, new_root_signatures = self._load_data(
449 Root, data
450 )
451 new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures)
452
453 self._trusted_set[Root.type] = new_root
454 logger.debug("Loaded trusted root v%d", new_root.version)
455
456
457def _load_from_metadata(
458 role: type[T],
459 data: bytes,
460 delegator: Delegator | None = None,
461 role_name: str | None = None,
462) -> tuple[T, bytes, dict[str, Signature]]:
463 """Load traditional metadata bytes, and extract and verify payload.
464
465 If no delegator is passed, verification is skipped. Returns a tuple of
466 deserialized payload, signed payload bytes, and signatures.
467 """
468 md = Metadata[T].from_bytes(data)
469
470 if md.signed.type != role.type:
471 raise exceptions.RepositoryError(
472 f"Expected '{role.type}', got '{md.signed.type}'"
473 )
474
475 if delegator:
476 if role_name is None:
477 role_name = role.type
478
479 delegator.verify_delegate(role_name, md.signed_bytes, md.signatures)
480
481 return md.signed, md.signed_bytes, md.signatures
482
483
484def _load_from_simple_envelope(
485 role: type[T],
486 data: bytes,
487 delegator: Delegator | None = None,
488 role_name: str | None = None,
489) -> tuple[T, bytes, dict[str, Signature]]:
490 """Load simple envelope bytes, and extract and verify payload.
491
492 If no delegator is passed, verification is skipped. Returns a tuple of
493 deserialized payload, signed payload bytes, and signatures.
494 """
495
496 envelope = SimpleEnvelope[T].from_bytes(data)
497
498 if envelope.payload_type != SimpleEnvelope.DEFAULT_PAYLOAD_TYPE:
499 raise exceptions.RepositoryError(
500 f"Expected '{SimpleEnvelope.DEFAULT_PAYLOAD_TYPE}', "
501 f"got '{envelope.payload_type}'"
502 )
503
504 if delegator:
505 if role_name is None:
506 role_name = role.type
507 delegator.verify_delegate(
508 role_name, envelope.pae(), envelope.signatures
509 )
510
511 signed = envelope.get_signed()
512 if signed.type != role.type:
513 raise exceptions.RepositoryError(
514 f"Expected '{role.type}', got '{signed.type}'"
515 )
516
517 return signed, envelope.pae(), envelope.signatures